Re: [PR] MINOR: Replaced Utils.join() with JDK API. [kafka]

2024-04-30 Thread via GitHub


chia7712 commented on code in PR #15823:
URL: https://github.com/apache/kafka/pull/15823#discussion_r1585914471


##
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java:
##
@@ -25,21 +25,21 @@
 import org.apache.kafka.common.requests.FetchRequest.PartitionData;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
+import java.util.LinkedHashMap;

Review Comment:
   Could you reduce the changes of imports? Also, the new order is weird to me 
as it is not a lexicographical order



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable [kafka]

2024-04-30 Thread via GitHub


chia7712 commented on code in PR #15838:
URL: https://github.com/apache/kafka/pull/15838#discussion_r1585911858


##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -1541,6 +1541,36 @@ class KRaftClusterTest {
   cluster.close()
 }
   }
+
+  @Test
+  def testReduceNumNetworkThreads(): Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setNumBrokerNodes(1).
+setNumControllerNodes(1).build()).
+  setConfigProp(KafkaConfig.NumNetworkThreadsProp, "4").
+  build()
+try {
+  cluster.format()
+  cluster.startup()
+  cluster.waitForReadyBrokers()
+  val admin = Admin.create(cluster.clientProperties())
+  try {
+admin.incrementalAlterConfigs(
+  Collections.singletonMap(new ConfigResource(Type.BROKER, ""),
+Collections.singletonList(new AlterConfigOp(
+  new ConfigEntry(KafkaConfig.NumNetworkThreadsProp, "2"), 
OpType.SET.all().get()
+val newTopic = Collections.singletonList(new NewTopic("test-topic", 1, 
1.toShort))
+val createTopicResult = admin.createTopics(newTopic)
+createTopicResult.all().get()

Review Comment:
   It seems reducing the network threads causes the disconnection and it 
produces the retry of request forward. Hence, that can make this test unstable. 
Maybe we can increase the network threads instead?



##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -303,17 +303,17 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 addBrokerReconfigurable(controller.socketServer)
   }
 
-  def addReconfigurable(reconfigurable: Reconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
+  def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
 verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)

Review Comment:
   It seems `verifyReconfigurableConfigs` does not need lock also, since it 
just check the input strings.



##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -1541,6 +1541,36 @@ class KRaftClusterTest {
   cluster.close()
 }
   }
+
+  @Test
+  def testReduceNumNetworkThreads(): Unit = {

Review Comment:
   Pardon me, is this test case related to deadlock?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16394: Fix null propagation in foreign key join result [kafka]

2024-04-30 Thread via GitHub


mjsax commented on PR #15607:
URL: https://github.com/apache/kafka/pull/15607#issuecomment-2088000194

   Can we update existing test to not use `Map` but `List` (as I mentioned on 
the duplicate ticket https://issues.apache.org/jira/browse/KAFKA-16644) which 
should expose the bug?
   
   Need to think about the fix a little more (need to refresh my memory on the 
different "subscription types").


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16644.
-
Resolution: Duplicate

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-04-30 Thread via GitHub


mjsax commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1585886754


##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,41 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning by downstream key changing 
operations.

Review Comment:
   ```suggestion
* and does not require further repartitioning by downstream key 
depedent operations.
   ```



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -222,21 +226,21 @@ public  KStream selectKey(final 
KeyValueMapper selectKeyProcessorNode = 
internalSelectKey(mapper, new NamedInternal(named));
-selectKeyProcessorNode.keyChangingOperation(true);
+selectKeyProcessorNode.keyChangingOperation(repartitionRequired);
 
 builder.addGraphNode(graphNode, selectKeyProcessorNode);
 
 // key serde cannot be preserved
 return new KStreamImpl<>(
-selectKeyProcessorNode.nodeName(),
-null,
-valueSerde,
-subTopologySourceNodes,
-true,
-selectKeyProcessorNode,
-builder);
+selectKeyProcessorNode.nodeName(),

Review Comment:
   nit: avoid unnecessary reformatting (ie, indention in this case) -- I assume 
you have some "auto format" feature enabled in your IDE. I would recommend to 
disable it, or adjust the setting to avoid noise like this.



##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,41 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning by downstream key changing 
operations.
+ * 
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with 
interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in their 
original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.
+ * 
+ * This method will overwrite a default behavior as described below.
+ * By default, Kafka Streams always automatically repartition the records 
to prepare for a stateful operation,
+ * however, it is not always required when input stream is partitioned as 
intended. As an example,
+ * if an input stream is partitioned by a String key1, calling the below 
function will trigger a repartition:
+ * 
+ * {@code
+ * KStream inputStream = builder.stream("topic");
+ * stream
+ *   .selectKey( ... => (key1, metric))
+ *   .groupByKey()
+ *   .aggregate()
+ * }
+ * 
+ * You can then overwrite the default behavior by calling this method:
+ * {@code
+ * stream
+ *   .selectKey( ... => (key1, metric))
+ *   .markAsPartitioned()
+ *   .groupByKey()
+ *   .aggregate()
+ * }
+ *  

Review Comment:
   Do we need this tag?



##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,41 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning by downstream key changing 
operations.
+ * 
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with 
interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in their 
original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.

Review Comment:
   Can you refresh my memory about joins? I cannot remember the details.
   
   We should add a section to the `docs/streams/developer-guide/dsl-api.html` 
and explain the "do" and "donts" of this operation.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -222,21 +226,21 @@ public  KStream selectKey(final 

[jira] [Assigned] (KAFKA-16650) add integration test for Admin#abortTransaction

2024-04-30 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16650:
--

Assignee: Kuan Po Tseng  (was: Chia-Ping Tsai)

> add integration test for Admin#abortTransaction
> ---
>
> Key: KAFKA-16650
> URL: https://issues.apache.org/jira/browse/KAFKA-16650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Minor
>
> It seems there are only few unit tests. We should add IT includeing zk, 
> kraft, and new group coordinator for it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16650) add integration test for Admin#abortTransaction

2024-04-30 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842542#comment-17842542
 ] 

Kuan Po Tseng commented on KAFKA-16650:
---

May I take over this issue ? :)

> add integration test for Admin#abortTransaction
> ---
>
> Key: KAFKA-16650
> URL: https://issues.apache.org/jira/browse/KAFKA-16650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> It seems there are only few unit tests. We should add IT includeing zk, 
> kraft, and new group coordinator for it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-04-30 Thread via GitHub


AndrewJSchofield commented on code in PR #15803:
URL: https://github.com/apache/kafka/pull/15803#discussion_r1585852808


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1789,6 +1791,33 @@ public void testProcessBackgroundEventsTimesOut() throws 
Exception {
 }
 }
 
+/**
+ * Tests that calling {@link Thread#interrupt()} before {@link 
KafkaConsumer#poll(Duration)}
+ * causes {@link InterruptException} to be thrown.
+ */
+@Test
+public void testPollThrowsInterruptExceptionIfInterrupted() {
+consumer = newConsumer();
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+consumer.assign(singleton(tp));
+
+// interrupt the thread and call poll
+try {
+Thread.currentThread().interrupt();
+assertThrows(InterruptException.class, () -> 
consumer.poll(Duration.ZERO));
+} finally {
+// clear interrupted state again since this thread may be reused 
by JUnit

Review Comment:
   By calling `Thread.interrupted()`, the code is ensuring that the test does 
not exit with the thread still in an interrupted state. I have updated the 
comment accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest

2024-04-30 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842538#comment-17842538
 ] 

Chia-Ping Tsai commented on KAFKA-16223:


[~cmukka20] Could we take over remaining tasks? KafkaConfigBackingStoreTest is 
the last one which use PowerMock. It is time to close this migration party :)

> Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
> ---
>
> Key: KAFKA-16223
> URL: https://issues.apache.org/jira/browse/KAFKA-16223
> Project: Kafka
>  Issue Type: Sub-task
>  Components: connect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16027) Refactor MetadataTest#testUpdatePartitionLeadership

2024-04-30 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842537#comment-17842537
 ] 

Johnny Hsu commented on KAFKA-16027:


hey [~alexanderaghili] may I know if we have any updates on this?
I am happy to help if you are busy with something else :) 

> Refactor MetadataTest#testUpdatePartitionLeadership
> ---
>
> Key: KAFKA-16027
> URL: https://issues.apache.org/jira/browse/KAFKA-16027
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Philip Nee
>Assignee: Alexander Aghili
>Priority: Minor
>  Labels: newbie
>
> MetadataTest#testUpdatePartitionLeadership is extremely long.  I think it is 
> pretty close to the 160 line method limit - I tried to modfity it but it 
> would hit the limit when i tried to break things into separated lines.
> The test also contains two tests, so it is best to split it into two separate 
> tests.
> We should also move this to ConsumerMetadata.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-30 Thread via GitHub


FrankYang0529 commented on PR #15745:
URL: https://github.com/apache/kafka/pull/15745#issuecomment-2087904101

   Hi @chia7712, thanks for the review. I address all comments and add some 
test cases for ClusterConfig.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-30 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1580155319


##
clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java:
##
@@ -44,11 +44,15 @@ public enum ControlRecordType {
 ABORT((short) 0),
 COMMIT((short) 1),
 
-// Raft quorum related control messages.
+// KRaft quorum related control messages
 LEADER_CHANGE((short) 2),
 SNAPSHOT_HEADER((short) 3),
 SNAPSHOT_FOOTER((short) 4),
 
+// KRaft membership changes messages
+KRAFT_VERSION((short) 5),
+VOTERS((short) 6),

Review Comment:
   Sounds good. Fixed for KRAFT_VOTERS. I'll fix the rest in another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16637) KIP-848 does not work well

2024-04-30 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842530#comment-17842530
 ] 

Kirk True commented on KAFKA-16637:
---

[~chickenchickenlove]—thanks for filing this. There are two existing 
improvements (KAFKA-15974 and KAFKA-16200) that fix timing issues in the new 
consumer. However, even when testing your case on a temporary branch that 
includes fixes for both of those issues, the problem still showed up.

This issue is related to an optimization for offset fetch logic.

When a user calls {{Consumer.poll()}}, among other things, the consumer 
performs a network request to fetch any previously-committed offsets so it can 
determine from where to start fetching new records. When the user passes in a 
timeout of zero, it's almost always the case that the offset fetch network 
request will not be performed within 0 milliseconds. However, the consumer 
still sends out the request and handles the response when it is received, 
usually a few milliseconds later. In this first attempt, the lookup fails and 
the {{poll()}} loops back around. Given that this timeout is the common case, 
the consumer caches the offset fetch response/result from the first attempt 
(even though it timed out) because it knows that the _next_ call to {{poll()}} 
is going to attempt the exact same operation. When it is later attempted a 
second time, the response is already there from the first attempt such that the 
consumer doesn't need to perform a network request.
 
The existing consumer has implemented this caching in 
[PendingCommittedOffsetRequest|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L132].
 The new consumer has implemented it in 
[CommitRequestManager|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L510].
 The core issue is the new consumer implementation is clearing out the first 
attempt's cached result too aggressively. The effect being that the second (and 
subsequent) attempts fail to find any previous attempt's cached result, and all 
submit network requests, which all fail. Thus the consumer never makes any 
headway.

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to 

Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-04-30 Thread via GitHub


pasharik commented on code in PR #15830:
URL: https://github.com/apache/kafka/pull/15830#discussion_r1585713585


##
core/src/main/scala/kafka/admin/AclCommand.scala:
##
@@ -115,8 +115,6 @@ object AclCommand extends Logging {
   val aclBindings = acls.map(acl => new AclBinding(resource, 
acl)).asJavaCollection
   adminClient.createAcls(aclBindings).all().get()
 }
-
-listAcls(adminClient)

Review Comment:
   Restored this print for now.
   
   For me, this print together with checking for the output of this print 
inside the test, causes kraft tests to be flaky. E.g.
   ```
   ./gradlew core:test --tests 
"kafka.admin.AclCommandTest.testAclCliWithAdminAPI" --rerun
   ```
   
   it always passes for `zk`, but quite often fails for `kraft`.
   
   As I understand, `listAcls()` can be served by any broker in kraft mode, not 
just by controller. So potentially some brokers may not have most up-to-date 
copy of metadata right after `createAcls()` call. Here there is no interval 
between `createAcls()` and `listAcls()`, so probability of such race condition 
is higher.
   From [KIP-500](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500) 
and the 
[diagram](https://cwiki.apache.org/confluence/download/attachments/123898922/b.png)
 it seems what brokers have to periodically pull new metadata from the kraft 
controller, so metadata is not available on all brokers immediately.
   Please correct me if I'm wrong here.
   
   I've looked at `TopicCommand`, as another example of the class which manages 
metadata. It seems what it doesn't invoke `listTopics()` after creating or 
deleting topics. So I thought we can use similar approach here.
   
   I tried to reproduce this flaky behavior with running Kafka controller and 
broker locally with `bin/kafka-server-start.sh`, and invoking 
`bin/kafka-acls.sh` manually, but wasn't able to reproduce same issue.
   
   Probably, it can be related to the test infrastructure setup. I'll try to 
re-write the test to java and use new test infrastructure, let's see it solves 
the issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-04-30 Thread via GitHub


pasharik commented on code in PR #15830:
URL: https://github.com/apache/kafka/pull/15830#discussion_r1585704666


##
core/src/main/scala/kafka/admin/AclCommand.scala:
##
@@ -115,8 +115,6 @@ object AclCommand extends Logging {
   val aclBindings = acls.map(acl => new AclBinding(resource, 
acl)).asJavaCollection
   adminClient.createAcls(aclBindings).all().get()
 }
-
-listAcls(adminClient)

Review Comment:
   Restored this print for now.
   
   For me, this print together with checking for the output of this print 
inside the test, causes kraft tests to be flaky. E.g.
   ```
   ./gradlew core:test --tests 
"kafka.admin.AclCommandTest.testAclCliWithAdminAPI" --rerun
   ```
   
   it always passes for `zk`, but quite often fails for `kraft`.
   
   As I understand, `listAcls()` can be served by any broker in kraft mode, not 
just by controller. So potentially some brokers may not have most up-to-date 
copy of metadata right after `createAcls()` call. Here there is no interval 
between `createAcls()` and `listAcls()`, so probability of such race condition 
is higher.
   From [KIP-500](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500) 
and the 
[diagram](https://cwiki.apache.org/confluence/download/attachments/123898922/b.png)
 it seems what brokers have to periodically pull new metadata from the kraft 
controller, so metadata is not available on all brokers immediately.
   Please correct me if I'm wrong here.
   
   I've looked at `TopicCommand`, as another example of the class which manages 
metadata. It seems what it doesn't invoke `listTopics()` after creating or 
deleting topics. So I thought we can use similar approach here.
   
   I tried to reproduce this flaky behavior with running Kafka controller and 
broker locally with `bin/kafka-server-start.sh`, and invoking 
`bin/kafka-acls.sh` manually, but wasn't able to reproduce same issue.
   
   Probably, it can be related to the test infrastructure setup. I'll try to 
re-write the test to java and use new test infrastructure, let's see it solves 
the issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-04-30 Thread via GitHub


pasharik commented on PR #15830:
URL: https://github.com/apache/kafka/pull/15830#issuecomment-2087745006

   > Hi @pasharik. Thanks for the change.
   > 
   > > In the original implementation, listAcls() method was called directly 
from addAcls() and removeAcls() methods, which caused a race condition in KRaft 
mode, so the test become flaky
   > 
   > Can you share a bit more detail on this? How specifically does 
`listAcls()` contribute to flakiness? Thanks
   
   Replied in the [above 
thread](https://github.com/apache/kafka/pull/15830#discussion_r1584190323)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-04-30 Thread via GitHub


pasharik commented on code in PR #15830:
URL: https://github.com/apache/kafka/pull/15830#discussion_r1585704666


##
core/src/main/scala/kafka/admin/AclCommand.scala:
##
@@ -115,8 +115,6 @@ object AclCommand extends Logging {
   val aclBindings = acls.map(acl => new AclBinding(resource, 
acl)).asJavaCollection
   adminClient.createAcls(aclBindings).all().get()
 }
-
-listAcls(adminClient)

Review Comment:
   Restored this print for now.
   
   For me, this print together with checking for the output of this print 
inside the test, causes kraft tests to be flaky. E.g.
   ```
   ./gradlew core:test --tests 
"kafka.admin.AclCommandTest.testAclCliWithAdminAPI" --rerun
   ```
   
   it always passes for `zk`, but quite often fails for `kraft`.
   
   As I understand, `listAcls()` can be served by any broker in kraft mode, not 
just by controller. So potentially some brokers may not have most up-to-date 
copy of metadata right after `createAcls()` call. Here there is no interval 
between `createAcls()` and `listAcls()`, so probability of such race condition 
is higher.
   From [KIP-500](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500) 
and the 
[diagram](https://cwiki.apache.org/confluence/download/attachments/123898922/b.png)
 it seems what brokers have to periodically pull new metadata from the kraft 
controller, so metadata is not available on all brokers immediately.
   Please correct me if I'm wrong here.
   
   I've looked at `TopicCommand`, as another example of the class which manages 
metadata. It seems what it doesn't invoke `listTopics()` after creating or 
deleting topics. So I thought we can use similar approach here.
   
   I tried to reproduce this flaky behavior with running Kafka controller and 
broker locally with `bin/kafka-server-start.sh`, and invoking 
`bin/kafka-acls.sh` manually, but wasn't able to reproduce same issue.
   
   Probably, it can be related to the test infrastructure setup. I'll try to 
re-write the test to java and use new test infrastructure, let's see it solves 
the issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842514#comment-17842514
 ] 

Matthias J. Sax commented on KAFKA-16644:
-

Sorry. Wrong link. Fixed -> https://issues.apache.org/jira/browse/KAFKA-14748 

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16644:

Description: 
We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a 
left-hand side record is deleted, the join now emits two tombstone records 
instead of one.

The problem was not detected via unit test, because the tests use a `Map` 
instead of a `List` when verifying the result topic records 
([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]

We should update all test cases to use `List` when reading from the output 
topic, and of course fix the introduced bug: The 
`SubscriptionSendProcessorSupplier` is sending two subscription records instead 
of just a single one: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]

 

  was:
We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
left-hand side record is deleted, the join now emits two tombstone records 
instead of one.

The problem was not detected via unit test, because the tests use a `Map` 
instead of a `List` when verifying the result topic records 
([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]

We should update all test cases to use `List` when reading from the output 
topic, and of course fix the introduced bug: The 
`SubscriptionSendProcessorSupplier` is sending two subscription records instead 
of just a single one: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]

 


> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16475: add more tests to TopicImageNodeTest [kafka]

2024-04-30 Thread via GitHub


cmccabe merged PR #15735:
URL: https://github.com/apache/kafka/pull/15735


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-04-30 Thread via GitHub


linu-shibu commented on PR #15620:
URL: https://github.com/apache/kafka/pull/15620#issuecomment-2087468237

   @gharris1727 I do not have permission/write access to merge the PR. Will I 
get permission/right to merge? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-04-30 Thread via GitHub


linu-shibu commented on PR #15620:
URL: https://github.com/apache/kafka/pull/15620#issuecomment-2087467721

   > Test failures appear unrelated, there's a targeted 
RemoteLogMetadataSerdeTest for this logic, and the storage tests appear to pass 
for me locally.
   
   Yes, in local, the tests are passing for me as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


philipnee commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585558712


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java:
##
@@ -30,12 +29,7 @@ public abstract class CommitEvent extends 
CompletableApplicationEvent {
  */
 private final Map offsets;
 
-protected CommitEvent(final Type type, final Map offsets, final Timer timer) {
-super(type, timer);
-this.offsets = validate(offsets);
-}
-
-protected CommitEvent(final Type type, final Map offsets, final long deadlineMs) {
+public CommitEvent(final Type type, Map 
offsets, final long deadlineMs) {

Review Comment:
   ditto: we probably don't need the final but it would be good to be 
consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


philipnee commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585179214


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1273,6 +1228,22 @@ private void close(Duration timeout, boolean 
swallowException) {
 if (applicationEventHandler != null)
 closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed shutting down network thread", firstException);
 closeTimer.update();
+
+// close() can be called from inside one of the constructors. In that 
case, it's possible that neither
+// the reaper nor the background event queue were constructed, so 
check them first to avoid NPE.
+if (backgroundEventReaper != null && backgroundEventQueue != null) {
+// Copy over the completable events to a separate list, then reap 
any incomplete
+// events on that list.
+LinkedList allEvents = new LinkedList<>();

Review Comment:
   Any specific reason for using linkedlist implementation? 



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class CompletableEventReaperTest {
+
+private final LogContext logContext = new LogContext();
+private final Time time = new MockTime(0, 0, 0);

Review Comment:
   the test should work without setting the current time to 0.  so I think new 
MockTime(0) should be fine.



##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -987,6 +987,7 @@ public void testResetUsingAutoResetPolicy(GroupProtocol 
groupProtocol) {
 @ParameterizedTest
 @EnumSource(GroupProtocol.class)
 public void testOffsetIsValidAfterSeek(GroupProtocol groupProtocol) {
+Time time = new MockTime(1);

Review Comment:
   can we remove this? I think the test works without it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-30 Thread via GitHub


hachikuji commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1583487039


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -181,20 +181,12 @@ class KafkaRaftManager[T](
   private val clientDriver = new KafkaRaftClientDriver[T](client, 
threadNamePrefix, fatalFaultHandler, logContext)
 
   def startup(): Unit = {
-// Update the voter endpoints (if valid) with what's in RaftConfig
-val voterAddresses: util.Map[Integer, AddressSpec] = 
controllerQuorumVotersFuture.get()
-for (voterAddressEntry <- voterAddresses.entrySet.asScala) {
-  voterAddressEntry.getValue match {
-case spec: InetAddressSpec =>
-  netChannel.updateEndpoint(voterAddressEntry.getKey, spec)
-case _: UnknownAddressSpec =>
-  info(s"Skipping channel update for destination ID: 
${voterAddressEntry.getKey} " +
-s"because of non-routable endpoint: 
${NON_ROUTABLE_ADDRESS.toString}")
-case invalid: AddressSpec =>
-  warn(s"Unexpected address spec (type: ${invalid.getClass}) for 
channel update for " +
-s"destination ID: ${voterAddressEntry.getKey}")
-  }
-}
+client.initialize(
+  controllerQuorumVotersFuture.get(),
+  config.controllerListenerNames.head,
+  new FileBasedStateStore(new File(dataDir, "quorum-state")),

Review Comment:
   Maybe we can have a constant for the file name. Not sure if it is used 
elsewhere, but it would be nice to have a nice descriptive name.



##
raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java:
##
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Arrays;
+import java.util.Optional;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.KRaftVersionRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.raft.MockLog;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.snapshot.RecordsSnapshotWriter;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+final class KRaftControlRecordStateMachineTest {
+private static final RecordSerde STRING_SERDE = new StringSerde();
+
+private static MockLog buildLog() {
+return new MockLog(new TopicPartition("partition", 0), 
Uuid.randomUuid(), new LogContext());
+}
+
+private static KRaftControlRecordStateMachine 
buildPartitionListener(MockLog log, Optional staticVoterSet) {
+return new KRaftControlRecordStateMachine(
+staticVoterSet,
+log,
+STRING_SERDE,
+BufferSupplier.NO_CACHING,
+1024,
+new LogContext()
+);
+}
+
+@Test
+void testEmptyParition() {

Review Comment:
   nit: typo Par**t**ition



##
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##
@@ -206,8 +151,16 @@ private static Integer parseVoterId(String idString) {
 }
 }
 
-public static Map parseVoterConnections(List 
voterEntries) {
-Map voterMap = new HashMap<>();
+public static Map 
parseVoterConnections(List voterEntries) {
+return parseVoterConnections(voterEntries, true);
+}
+
+public static Set parseVoterIds(List voterEntries) {
+return parseVoterConnections(voterEntries, false).keySet();
+}
+
+private static Map 
parseVoterConnections(List voterEntries, boolean routableOnly) {

Review Comment:
   nit: I think `requireRoutableAddresses` might convey the expectation more 
clearly.



##
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##
@@ -213,20 +213,21 @@ private void completeCurrentBatch() {
  *
  * @param valueCreator a function that uses the passed buffer to create 
the control
  *batch that will be appended. The memory records returned must 

[jira] [Created] (KAFKA-16650) add integration test for Admin#abortTransaction

2024-04-30 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16650:
--

 Summary: add integration test for Admin#abortTransaction
 Key: KAFKA-16650
 URL: https://issues.apache.org/jira/browse/KAFKA-16650
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


It seems there are only few unit tests. We should add IT includeing zk, kraft, 
and new group coordinator for it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


lianetm commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2087359722

   Hey @kirktrue , thanks a lot for the PR, this is a big piece! I completed a 
pass of all the non-test files, left some comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


lianetm commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2087355371

   > > Here I have a comment, I could not put at the right location in the code:
   > > 
   > > On line 1362, in commitSync() the consumer waits on the commitFuture 
with a timer. I think, it should not wait on a timer there since we already 
wait on a timer in the background thread.
   > 
   > I agree. What about the timed wait in 
awaitPendingAsyncCommitsAndExecuteCommitCallbacks()?
   
   Agree we should not wait on the `commitFuture` with a timer because the 
deadline is contained in the event we submitted, and already enforced by the 
reaper, and not clear about what the proposed relationship with 
`awaitPendingAsyncCommitsAndExecuteCommitCallbacks` is??
   
   I would expect we only need to call 
`ConsumerUtils.getResult(commitFuture);`, and that is consistent with how we 
get results for all other completable events now:
   - we create an event with a deadline
   - we call `applicationEventHandler.addAndGet(event)`   
   For the commit case that flow has a different shape just because we use 
`applicationEventHandler.add(event)` 
[here](https://github.com/apache/kafka/blob/097522abd6b51bca2407ea0de7009ed6a2d970b4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L775),
 to cater for commit sync and async, but we should still apply the same 
approach and just call get without any time boundary I would say. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable [kafka]

2024-04-30 Thread via GitHub


cmccabe opened a new pull request, #15838:
URL: https://github.com/apache/kafka/pull/15838

   Do not acquire the DynamicBrokerConfig lock in 
DynamicBrokerConfig.removeReconfigurable. It's not necessary, because the list 
that these functions are modifying is a thread-safe CopyOnWriteArrayList.  In 
DynamicBrokerConfig.reloadUpdatedFilesWithoutConfigChange, I changed the code 
to use a simple Java forEach rather than a Scala conversion, in order to feel 
more confident that concurrent modifications to the List would not have any bad 
effects here. (forEach is always safe on CopyOnWriteArrayList.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 UserScramCredentialsCommandTest rewritten in java [kafka]

2024-04-30 Thread via GitHub


chia7712 commented on code in PR #15832:
URL: https://github.com/apache/kafka/pull/15832#discussion_r1585527416


##
core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java:
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin;
+
+import kafka.server.BaseRequestTest;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.ClusterTests;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.Exit;
+import org.apache.kafka.test.NoRetryException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.Console;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@SuppressWarnings("dontUseSystemExit")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults

Review Comment:
   `@ClusterTestDefaults(clusterType = Type.ALL)` Let's test all types



##
core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java:
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin;
+
+import kafka.server.BaseRequestTest;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.ClusterTests;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.Exit;
+import org.apache.kafka.test.NoRetryException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.Console;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@SuppressWarnings("dontUseSystemExit")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults
+public class UserScramCredentialsCommandTest extends BaseRequestTest {
+private static final String USER1 = "user1";
+private static final String USER2 = "user2";
+
+@Override
+public int brokerCount() {
+return 1;
+}
+
+static class ConfigCommandResult {
+public final String stdout;
+public final OptionalInt exitStatus;
+
+public ConfigCommandResult(String stdout) {
+this(stdout, OptionalInt.empty());
+}
+
+public ConfigCommandResult(String stdout, OptionalInt exitStatus) {
+this.stdout = stdout;
+this.exitStatus = exitStatus;
+}
+}
+
+private ConfigCommandResult 

[jira] [Updated] (KAFKA-16649) Remove lock from DynamicBrokerConfig.removeReconfigurable

2024-04-30 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-16649:
-
Summary: Remove lock from DynamicBrokerConfig.removeReconfigurable  (was: 
Fix potential deadlock in DynamicBrokerConfig)

> Remove lock from DynamicBrokerConfig.removeReconfigurable
> -
>
> Key: KAFKA-16649
> URL: https://issues.apache.org/jira/browse/KAFKA-16649
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16649) Remove lock from DynamicBrokerConfig.removeReconfigurable

2024-04-30 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-16649:
-
Description: Do not acquire the DynamicBrokerConfig lock in 
DynamicBrokerConfig.removeReconfigurable. It's not necessary, because the list 
that these functions are modifying is a thread-safe CopyOnWriteArrayList.

> Remove lock from DynamicBrokerConfig.removeReconfigurable
> -
>
> Key: KAFKA-16649
> URL: https://issues.apache.org/jira/browse/KAFKA-16649
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Priority: Major
>
> Do not acquire the DynamicBrokerConfig lock in 
> DynamicBrokerConfig.removeReconfigurable. It's not necessary, because the 
> list that these functions are modifying is a thread-safe CopyOnWriteArrayList.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16649) Fix potential deadlock in DynamicBrokerConfig

2024-04-30 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-16649:


 Summary: Fix potential deadlock in DynamicBrokerConfig
 Key: KAFKA-16649
 URL: https://issues.apache.org/jira/browse/KAFKA-16649
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842504#comment-17842504
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-16644 at 4/30/24 8:57 PM:
-

[~mjsax] is KAFKA-14778 the correct issue that introduced a regression? That 
seems to link to an unrelated (and also unresolved) ticket


was (Author: ableegoldman):
[~mjsax] is KAFKA-14778 the correct issue? It seems to link to an unrelated 
(and also unresolved) ticket

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842504#comment-17842504
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16644:


[~mjsax] is KAFKA-14778 the correct issue? It seems to link to an unrelated 
(and also unresolved) ticket

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15307: Update/errors for deprecated config [kafka]

2024-04-30 Thread via GitHub


Cerchie commented on PR #14448:
URL: https://github.com/apache/kafka/pull/14448#issuecomment-2087251352

   tagging @mjsax in for re-review 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-30 Thread via GitHub


chia7712 commented on code in PR #15745:
URL: https://github.com/apache/kafka/pull/15745#discussion_r1585496531


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -319,7 +302,10 @@ public Builder setPerBrokerProperties(Map> perBroke
 }
 
 public ClusterConfig build() {
-return new ClusterConfig(type, brokers, controllers, name, 
autoStart, securityProtocol, listenerName,
+if (brokers <= 0 || controllers <= 0 || disksPerBroker <= 0) {

Review Comment:
   We should enable to set `brokers` to 0 as it is valid to have a cluster with 
only quorum nodes.



##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -202,8 +151,19 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 for (ClusterConfigProperty property : annot.serverProperties()) {
 serverProperties.put(property.key(), property.value());
 }
-configBuilder.setServerProperties(serverProperties);
-type.invocationContexts(context.getRequiredTestMethod().getName(), 
configBuilder.build(), testInvocations);
+ClusterConfig config = ClusterConfig.builder()
+.setType(type)
+.setBrokers(annot.brokers() == 0 ? defaults.brokers() : 
annot.brokers())
+.setControllers(annot.controllers() == 0 ? 
defaults.controllers() : annot.controllers())
+.setDisksPerBroker(annot.disksPerBroker() == 0 ? 
defaults.disksPerBroker() : annot.disksPerBroker())
+.setAutoStart(annot.autoStart() == AutoStart.DEFAULT ? 
defaults.autoStart() : annot.autoStart() == AutoStart.YES)
+.setName(annot.name().isEmpty() ? null : annot.name())

Review Comment:
   maybe `annot.name().trim().isEmpty()` is more suitable.



##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -202,8 +151,19 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 for (ClusterConfigProperty property : annot.serverProperties()) {
 serverProperties.put(property.key(), property.value());
 }
-configBuilder.setServerProperties(serverProperties);
-type.invocationContexts(context.getRequiredTestMethod().getName(), 
configBuilder.build(), testInvocations);
+ClusterConfig config = ClusterConfig.builder()
+.setType(type)
+.setBrokers(annot.brokers() == 0 ? defaults.brokers() : 
annot.brokers())
+.setControllers(annot.controllers() == 0 ? 
defaults.controllers() : annot.controllers())
+.setDisksPerBroker(annot.disksPerBroker() == 0 ? 
defaults.disksPerBroker() : annot.disksPerBroker())
+.setAutoStart(annot.autoStart() == AutoStart.DEFAULT ? 
defaults.autoStart() : annot.autoStart() == AutoStart.YES)
+.setName(annot.name().isEmpty() ? null : annot.name())
+.setListenerName(annot.listener().isEmpty() ? null : 
annot.listener())

Review Comment:
   ditto



##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -319,7 +302,10 @@ public Builder setPerBrokerProperties(Map> perBroke
 }
 
 public ClusterConfig build() {
-return new ClusterConfig(type, brokers, controllers, name, 
autoStart, securityProtocol, listenerName,
+if (brokers <= 0 || controllers <= 0 || disksPerBroker <= 0) {

Review Comment:
   btw, those check should be moved to constructor of `ClusterConfig`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]

2024-04-30 Thread via GitHub


chia7712 commented on code in PR #15800:
URL: https://github.com/apache/kafka/pull/15800#discussion_r1585492894


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -91,9 +91,6 @@ public Stream 
provideTestTemplateInvocationContex
 ClusterTemplate clusterTemplateAnnot = 
context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTemplate.class);
 if (clusterTemplateAnnot != null) {
 processClusterTemplate(context, clusterTemplateAnnot, 
generatedContexts::add);
-if (generatedContexts.isEmpty()) {
-throw new IllegalStateException("ClusterConfig generator 
method should provide at least one config");

Review Comment:
   > try to add unit test for it.
   
   nice. It is ok to file a jira to log it and we can merge this PR first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]

2024-04-30 Thread via GitHub


Cerchie commented on PR #14360:
URL: https://github.com/apache/kafka/pull/14360#issuecomment-2087132901

   tagging @mjsax here, made some edits in response to the last roung 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]

2024-04-30 Thread via GitHub


Cerchie commented on code in PR #14360:
URL: https://github.com/apache/kafka/pull/14360#discussion_r1585489940


##
docs/streams/developer-guide/config-streams.html:
##
@@ -257,7 +258,12 @@ num.standby.replicasThe maximum number of records to buffer per 
partition.
 1000
   
-  cache.max.bytes.buffering
+  statestore.cache.max.bytes
+Medium
+Maximum number of memory bytes to be used for 
record caches across all threads. Note that at the debug level you can use 
cache.size to monitor the actual size of the Kafka Streams 
cache.
+10485760
+  
+  cache.max.bytes.buffering (Deprecated. Use 
cache.max.bytes instead.)

Review Comment:
   went through and did it manually, I think there was more than just this 
instance



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-04-30 Thread via GitHub


chia7712 commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1585489570


##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -603,14 +619,16 @@ class FetchSessionCache(private val maxEntries: Int,
   // A map containing sessions which can be evicted by privileged sessions.
   private val evictableByPrivileged = new util.TreeMap[EvictableKey, 
FetchSession]
 
+  private val metricTag = Map("shard" -> s"$shardNum").asJava
+
   // Set up metrics.
-  metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS)
-  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => 
FetchSessionCache.this.size)
-  
metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED)
-  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, 
() => FetchSessionCache.this.totalPartitions)
-  
metricsGroup.removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC)
+  metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, 
metricTag)
+  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => 
FetchSessionCache.this.size, metricTag)

Review Comment:
   not sure whether this is allowed. It seems to break the compatibility of 
metrics as it adds new tags. It means kafka users who monitoring this metrics 
need to update the query.



##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -389,9 +389,17 @@ class BrokerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  val fetchManager = new FetchManager(Time.SYSTEM,
-new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-  KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+  // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+  // for sessionIds falling in [Max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange)
+  val sessionIdRange = Int.MaxValue / config.numIoThreads

Review Comment:
   pardon me. what happens when users update `numIoThreads` dynamically?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]

2024-04-30 Thread via GitHub


TaiJuWu commented on code in PR #15800:
URL: https://github.com/apache/kafka/pull/15800#discussion_r1585486875


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -91,9 +91,6 @@ public Stream 
provideTestTemplateInvocationContex
 ClusterTemplate clusterTemplateAnnot = 
context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTemplate.class);
 if (clusterTemplateAnnot != null) {
 processClusterTemplate(context, clusterTemplateAnnot, 
generatedContexts::add);
-if (generatedContexts.isEmpty()) {
-throw new IllegalStateException("ClusterConfig generator 
method should provide at least one config");

Review Comment:
   If we move the check to `processClusterTemplate`, `generatedContexts` also 
need to expose to `processClusterTemplate`, so I prefer keep this version and 
revert the change and try to add unit test for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16637) KIP-848 does not work well

2024-04-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470
 ] 

Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 8:25 PM:
-

Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your client code again, locally 
starting a broker in kraft mode, with the default config, only adding 
`group.coordinator.rebalance.protocols=consumer,classic`. 1 topic, 1 partition, 
1 instance of your consumer app running with the poll duration of 1s, and was 
able to consume messages as expected. I only changed to StringDeserializers for 
simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest taking a 
look and share the broker logs to understand more about what's going on on your 
setup. If all looks good there maybe provide a ConsumerRebalanceListener to the 
call to subscribe, just to check/print the partitions assigned to your consumer 
on the onPartitionsAssigned callback. Hope it helps!


was (Author: JIRAUSER300183):
Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps!

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | 

Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]

2024-04-30 Thread via GitHub


TaiJuWu commented on code in PR #15800:
URL: https://github.com/apache/kafka/pull/15800#discussion_r1585486875


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -91,9 +91,6 @@ public Stream 
provideTestTemplateInvocationContex
 ClusterTemplate clusterTemplateAnnot = 
context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTemplate.class);
 if (clusterTemplateAnnot != null) {
 processClusterTemplate(context, clusterTemplateAnnot, 
generatedContexts::add);
-if (generatedContexts.isEmpty()) {
-throw new IllegalStateException("ClusterConfig generator 
method should provide at least one config");

Review Comment:
   If we move the check to `processClusterTemplate`, `generatedContexts` also 
need to expose to `processClusterTemplate`, so I prefer keep this version and 
revert the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]

2024-04-30 Thread via GitHub


Cerchie commented on code in PR #14360:
URL: https://github.com/apache/kafka/pull/14360#discussion_r1585481048


##
docs/streams/developer-guide/config-streams.html:
##
@@ -257,7 +258,12 @@ num.standby.replicasThe maximum number of records to buffer per 
partition.
 1000
   
-  cache.max.bytes.buffering
+  statestore.cache.max.bytes
+Medium
+Maximum number of memory bytes to be used for 
record caches across all threads. Note that at the debug level you can use 
cache.size to monitor the actual size of the Kafka Streams 
cache.

Review Comment:
   found it in some confluent docs -- looks like it's been partially 
implemented. 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390 will 
remove 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16027: MINOR Refactor MetadataTest#testUpdatePartitionLeadership [kafka]

2024-04-30 Thread via GitHub


chia7712 commented on PR #15055:
URL: https://github.com/apache/kafka/pull/15055#issuecomment-2087068199

   > Okay I messed up the git a little here (still have much to learn), the 
current trunk does exist on my 
[Kafka-16027](https://issues.apache.org/jira/browse/KAFKA-16027) branch but I 
had to redo it force push which led to auto-closing this branch. Any 
recommendation on what do to next?
   
   that is totally a bad news. Maybe you can take a look at this post 
(https://stackoverflow.com/questions/3973994/how-can-i-recover-from-an-erronous-git-push-f-origin-master)
 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Clean up TestUtils.scala [kafka]

2024-04-30 Thread via GitHub


chia7712 merged PR #15808:
URL: https://github.com/apache/kafka/pull/15808


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585466853


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java:
##
@@ -16,9 +16,118 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+import java.time.Duration;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.Objects.requireNonNull;
 
+/**
+ * {@code CompletableEvent} is an interface that is used by both {@link 
CompletableApplicationEvent} and
+ * {@link CompletableBackgroundEvent} for common processing and logic. A 
{@code CompletableEvent} is one that
+ * allows the caller to get the {@link #future() future} related to the event 
and the event's
+ * {@link #deadlineMs() expiration timestamp}.
+ *
+ * @param  Return type for the event when completed
+ */
 public interface CompletableEvent {
 
+/**
+ * Returns the {@link CompletableFuture future} associated with this 
event. Any event will have some related
+ * logic that is executed on its behalf. The event can complete in one of 
the following ways:
+ *
+ * 
+ * 
+ * Success: when the logic for the event completes successfully, 
the data generated by that event
+ * (if applicable) is passed to {@link 
CompletableFuture#complete(Object)}. In the case where the generic
+ * bound type is specified as {@link Void}, {@code null} is 
provided.
+ * 
+ * Error: when the the event logic generates an error, the error 
is passed to
+ * {@link CompletableFuture#completeExceptionally(Throwable)}.
+ * 
+ * 
+ * Timeout: when the time spent executing the event logic exceeds 
the {@link #deadlineMs() deadline}, an
+ * instance of {@link TimeoutException} should be created and 
passed to
+ * {@link CompletableFuture#completeExceptionally(Throwable)}.
+ * 
+ * 
+ * Cancelled: when an event remains incomplete when the consumer 
closes, the future will be
+ * {@link CompletableFuture#cancel(boolean) cancelled}. Attempts 
to {@link Future#get() get the result}

Review Comment:
   The reaper actually calls `completeExceptionally` with a 
`CancellationException` instead of calling `CompletableFuture#cancel(boolean)`. 
Unless I'm missing a subtle semantic diff they should achieve the same, but 
still, adding a link to `cancel` here would not be accurate I would say.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-30 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1585460093


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,332 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "1"),
+@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "true")

Review Comment:
   @frankvicky @FrankYang0529 Could you please address @lianetm comments? This 
class and `DeleteOffsetsConsumerGroupCommandIntegrationTest` need to test 
"LegacyConsumer + NEW_GROUP_COORDINATOR_ENABLE_CONFIG=false 
(`GroupCoordinatorAdapter`)"
   
   We can address that by `ClusterTemplate`. For example:
   
   ```java
   private static void generator(ClusterGenerator clusterGenerator) {
   Map serverProperties = new HashMap<>();
   
serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
"1");
   
serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1");
   

Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-30 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1585460093


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,332 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "1"),
+@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "true")

Review Comment:
   @frankvicky @FrankYang0529 Could you please address @lianetm comments? This 
class and `DeleteOffsetsConsumerGroupCommandIntegrationTest` need to test 
"LegacyConsumer + NEW_GROUP_COORDINATOR_ENABLE_CONFIG=false 
(`GroupCoordinatorAdapter`)"
   
   We can address that by `ClusterTemplate`. For example:
   
   ```java
   private static void generator(ClusterGenerator clusterGenerator) {
   Map serverProperties = new HashMap<>();
   
serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
"1");
   
serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1");
   

Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-30 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1585460093


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,332 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "1"),
+@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "true")

Review Comment:
   @frankvicky @FrankYang0529 Could you please address @lianetm comments? This 
class and `DeleteOffsetsConsumerGroupCommandIntegrationTest` need to test 
"LegacyConsumer + NEW_GROUP_COORDINATOR_ENABLE_CONFIG=false 
(`GroupCoordinatorAdapter`)"
   
   We can address that by `ClusterTemplate`. For example:
   
   ```java
   private static void generator(ClusterGenerator clusterGenerator) {
   Map serverProperties = new HashMap<>();
   
serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
"1");
   
serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1");
   // case 0: zk
   

[jira] [Updated] (KAFKA-9401) High lock contention for kafka.server.FetchManager.newContext

2024-04-30 Thread Gaurav Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Narula updated KAFKA-9401:
-
Fix Version/s: 3.8.0
   3.7.1

> High lock contention for kafka.server.FetchManager.newContext
> -
>
> Key: KAFKA-9401
> URL: https://issues.apache.org/jira/browse/KAFKA-9401
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Lucas Bradstreet
>Assignee: Gaurav Narula
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> kafka.server.FetchManager.newContext takes out what is essentially a global 
> fetch lock on kafka.server.FetchSessionCache, for updates to not only the 
> FetchSessionCache but the also update the fetch sessions stored with in it. 
> This causes a high amount of lock contention for fetches, as every fetch 
> request must go through this lock.
> I have taken an async-profiler lock profile on a high throughput cluster, and 
> I see around 25s of waiting on this lock for a sixty second profile.
> {noformat}
> *— 25818577497 ns (20.84%), 5805 samples
>  [ 0] kafka.server.FetchSessionCache
>  [ 1] kafka.server.FetchManager.newContext
>  [ 2] kafka.server.KafkaApis.handleFetchRequest
>  [ 3] kafka.server.KafkaApis.handle
>  [ 4] kafka.server.KafkaRequestHandler.run
>  [ 5] java.lang.Thread.run
>  {noformat}
> FetchSession.scala:
> {code:java}
>   cache.synchronized {
> cache.get(reqMetadata.sessionId) match {
>   case None => {
> debug(s"Session error for ${reqMetadata.sessionId}: no such session 
> ID found.")
> new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, 
> reqMetadata)
>   }
>   case Some(session) => session.synchronized {
> if (session.epoch != reqMetadata.epoch) {
>   debug(s"Session error for ${reqMetadata.sessionId}: expected epoch 
> " +
> s"${session.epoch}, but got ${reqMetadata.epoch} instead.");
>   new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, 
> reqMetadata)
> } else {
>   val (added, updated, removed) = session.update(fetchData, toForget, 
> reqMetadata)
>   if (session.isEmpty) {
> debug(s"Created a new sessionless FetchContext and closing 
> session id ${session.id}, " +
>   s"epoch ${session.epoch}: after removing 
> ${partitionsToLogString(removed)}, " +
>   s"there are no more partitions left.")
> cache.remove(session)
> new SessionlessFetchContext(fetchData)
>   } else {
> cache.touch(session, time.milliseconds())
> session.epoch = JFetchMetadata.nextEpoch(session.epoch)
> debug(s"Created a new incremental FetchContext for session id 
> ${session.id}, " +
>   s"epoch ${session.epoch}: added 
> ${partitionsToLogString(added)}, " +
>   s"updated ${partitionsToLogString(updated)}, " +
>   s"removed ${partitionsToLogString(removed)}")
> new IncrementalFetchContext(time, reqMetadata, session)
>   }
> }
>   }
> }
>   }
> {code}
> Contention has been made worse by the solution for "KAFKA-9137: Fix incorrect 
> FetchSessionCache eviction logic" 
> ([https://github.com/apache/kafka/pull/7640),] as the cache is correctly 
> touched now, whereas previously the touch was being skipped.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585435927


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java:
##
@@ -30,12 +29,7 @@ public abstract class CommitEvent extends 
CompletableApplicationEvent {
  */
 private final Map offsets;
 
-protected CommitEvent(final Type type, final Map offsets, final Timer timer) {
-super(type, timer);
-this.offsets = validate(offsets);
-}
-
-protected CommitEvent(final Type type, final Map offsets, final long deadlineMs) {
+public CommitEvent(final Type type, Map 
offsets, final long deadlineMs) {

Review Comment:
   Is there a reason for loosing the final on the offsets map? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-30 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1585435466


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,332 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "1"),
+@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "true")

Review Comment:
   > Not at all, actually we want to test it, my point is just that we also 
should test old coordinator + LegacyConsumer, and the way to achieve that is 
running the LegacyConsumer + NEW_GROUP_COORDINATOR_ENABLE_CONFIG=false;
   Makes sense?
   
   You are right. I overlooked the `GroupCoordinatorAdapter` :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585390985


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -273,9 +310,18 @@ void cleanup() {
 log.error("Unexpected error during shutdown.  Proceed with 
closing.", e);
 } finally {
 sendUnsentRequests(timer);
+
+LinkedList allEvents = new LinkedList<>();
+applicationEventQueue.drainTo(allEvents);
+List> completableEvents = allEvents
+.stream()
+.filter(e -> e instanceof CompletableApplicationEvent)
+.map(e -> (CompletableApplicationEvent) e)
+.collect(Collectors.toList());

Review Comment:
   This logic is always needed whenever we `reapIncomplete`, and is currently 
repeated when we call it from the AsyncConsumer or here, so what about we move 
it into the `reapIncomplete`, make it receive a list of all events and 
internally filter the ones that are `CompletableEvent`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585390985


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -273,9 +310,18 @@ void cleanup() {
 log.error("Unexpected error during shutdown.  Proceed with 
closing.", e);
 } finally {
 sendUnsentRequests(timer);
+
+LinkedList allEvents = new LinkedList<>();
+applicationEventQueue.drainTo(allEvents);
+List> completableEvents = allEvents
+.stream()
+.filter(e -> e instanceof CompletableApplicationEvent)
+.map(e -> (CompletableApplicationEvent) e)
+.collect(Collectors.toList());

Review Comment:
   This logic is always needed whenever we `reapIncomplete`, and is currently 
repeated when we call it from the AsyncConsumer or here, so what about we move 
it into the `reapIncomplete`, make it receive a list of all events and 
internally filter the ones that are `CompletableEvent`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-30 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1585359721


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,332 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "1"),
+@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "true")

Review Comment:
   Hey @chia7712 , agree that when `NEW_GROUP_COORDINATOR_ENABLE_CONFIG` is 
true, kraft broker will use the new coordinator, with zk out of the pic, all 
good there. But
   
   > we still create LegacyConsumer to test the legacy coordinator by setting 
group.protocol=classic
   
   `LegacyConsumer` does not make the legacy coordinator kick in unless 
`isNewGroupCoordinatorEnabled` is false, because the classic protocol is 
supported by both coordinators. So when using the `LegacyConsumer`, it implies 
classic protocol (only one it supports), but the decision of which coordinator 
will serve the classic protocol is taken based on the broker config to enable 
the new coordinator or not. 

[jira] [Assigned] (KAFKA-13447) Consumer should not reuse committed offset after topic recreation

2024-04-30 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee reassigned KAFKA-13447:
--

Assignee: Philip Nee

> Consumer should not reuse committed offset after topic recreation
> -
>
> Key: KAFKA-13447
> URL: https://issues.apache.org/jira/browse/KAFKA-13447
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer, needs-kip
>
> KAFKA-12257 fixes an issue in which the consumer is unable to make progress 
> after a topic has been recreated. The problem was that the client could not 
> distinguish between stale metadata with a lower leader epoch and a recreated 
> topic with a lower leader epoch. With TopicId support in KIP-516, the client 
> is able to tell when a topic has been recreated since the new topic will have 
> a different ID. 
> However, what the patch did not fix is the potential reuse of the current 
> offset position on the recreated topic. For example, say that the consumer is 
> at offset N when the topic gets recreated. Currently, the consumer will 
> continue fetching from offset N after detecting the recreation. The most 
> likely result of this is either an offset out of range error or a log 
> truncation error, but it is also possible for the offset position to remain 
> valid on the recreated topic (say for a low-volume topic where the offsets is 
> already low, or a case where the consumer was down for a while). 
> To fix this issue completely, we need to store the topicId along with the 
> committed offset in __consumer_offsets. This would allow the consumer to 
> detect when the offset is no longer relevant for the current topic. We also 
> need to decide how to raise this case to the user. If the user has enabled 
> automatic offset reset, we can probably use that. Otherwise, we might need a 
> new exception type to signal the user that the position needs to be reset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Allowing WriteTxnMarkers API to run with AlterCluster permissions [kafka]

2024-04-30 Thread via GitHub


sidyag opened a new pull request, #15837:
URL: https://github.com/apache/kafka/pull/15837

   Allowing WriteTxnMarkers API to run with AlterCluster permissions
   
   https://issues.apache.org/jira/browse/KAFKA-16513
   
   
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1037%3A+Allow+WriteTxnMarkers+API+with+Alter+Cluster+Permission
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16110) Document and publicize performance test results for AsyncKafkaConsumer

2024-04-30 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16110:
---
Issue Type: Task  (was: New Feature)

> Document and publicize performance test results for AsyncKafkaConsumer
> --
>
> Key: KAFKA-16110
> URL: https://issues.apache.org/jira/browse/KAFKA-16110
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, performance-benchmark
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16110) Document and publicize performance test results for AsyncKafkaConsumer

2024-04-30 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16110:
---
Summary: Document and publicize performance test results for 
AsyncKafkaConsumer  (was: Implement consumer performance tests)

> Document and publicize performance test results for AsyncKafkaConsumer
> --
>
> Key: KAFKA-16110
> URL: https://issues.apache.org/jira/browse/KAFKA-16110
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, performance-benchmark
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16290) Investigate propagating subscription state updates via queues

2024-04-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16290:
--
Priority: Major  (was: Critical)

> Investigate propagating subscription state updates via queues
> -
>
> Key: KAFKA-16290
> URL: https://issues.apache.org/jira/browse/KAFKA-16290
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 4.0.0
>
>
> We are mostly using the queues for interaction between application thread and 
> background thread, but the subscription object is shared between the threads, 
> and it is updated directly without going through the queues. 
> The way we allow updates to the subscription state from both threads is 
> definitely not right, and will bring trouble. Places like the assign() is 
> probably the most obvious, where we send an event to the background to 
> commit, but then update the subscription in the foreground right away.
> It seems sensible to aim for having all updates to the subscription state in 
> the background, triggered from the app thread via events (and I think we 
> already have related events for all updates, just that the subscription state 
> was left out in the app thread).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-04-30 Thread via GitHub


gharris1727 commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1585294415


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -655,27 +811,38 @@ private static ConfigInfos validateClientOverrides(String 
connName,
 ConnectorClientConfigRequest connectorClientConfigRequest = new 
ConnectorClientConfigRequest(
 connName, connectorType, connectorClass, clientConfigs, 
clientType);
 List configValues = 
connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
-if (configValues != null) {
-for (ConfigValue validatedConfigValue : configValues) {
-ConfigKey configKey = 
configKeys.get(validatedConfigValue.name());
-ConfigKeyInfo configKeyInfo = null;
-if (configKey != null) {
-if (configKey.group != null) {
-groups.add(configKey.group);
-}
-configKeyInfo = convertConfigKey(configKey, prefix);
-}
 
-ConfigValue configValue = new ConfigValue(prefix + 
validatedConfigValue.name(), validatedConfigValue.value(),
-  
validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
-if (!configValue.errorMessages().isEmpty()) {
-errorCount++;
+return prefixedConfigInfos(configDef.configKeys(), configValues, 
prefix);
+}
+
+private static ConfigInfos prefixedConfigInfos(Map 
configKeys, List configValues, String prefix) {
+int errorCount = 0;
+Set groups = new LinkedHashSet<>();
+List configInfos = new ArrayList<>();
+
+if (configValues == null) {

Review Comment:
   I think this null check is only relevant when the value is coming from the 
overridePolicy.validate, in validateConverterConfig, I think the 
ConfigDef#validate call will always be non-null.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -392,6 +399,146 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+/**
+ * General-purpose validation logic for converters that are configured 
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be 
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+ *  may be null, in which case no validation will 
be performed under the assumption that the
+ *  connector will use inherit the converter 
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ *(e.g., {@code 
org.apache.kafka.connect.storage.Converter.class});
+ *may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a 
{@link ConfigDef}
+ *  from an instance of the plugin type (e.g., 
{@code Converter::config});
+ *  may not be null
+ * @param pluginName a lowercase, human-readable name for the type of 
plugin (e.g., {@code "key converter"});
+ *   may not be null
+ * @param pluginProperty the property used to define a custom class for 
the plugin type
+ *   in a connector config (e.g., {@link 
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ *   may not be null
+ * @param defaultProperties any default properties to include in the 
configuration that will be used for
+ *  the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for 
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no 
custom plugin was defined in the connector
+ * config)
+
+ * @param  the plugin class to perform validation for
+ */
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue pluginConfigValue,
+Class pluginInterface,
+Function configDefAccessor,
+String pluginName,
+String pluginProperty,
+Map defaultProperties
+) {
+Objects.requireNonNull(connectorConfig);
+Objects.requireNonNull(pluginInterface);
+Objects.requireNonNull(configDefAccessor);
+Objects.requireNonNull(pluginName);
+Objects.requireNonNull(pluginProperty);
+
+String pluginClass = connectorConfig.get(pluginProperty);
+
+if (pluginClass == null
+|| pluginConfigValue == null
+|| 

Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-04-30 Thread via GitHub


gaurav-narula commented on PR #15836:
URL: https://github.com/apache/kafka/pull/15836#issuecomment-2086478984

   The following images show lock profiles collected using async-profiler 
before and after this change with numCacheShards = numIoThreads = 64 and 
demonstrates significant reduction in contention
   
   **Before**
   https://github.com/apache/kafka/assets/97168911/e2e1edad-7fe2-4260-908d-bc8d4395afca;>
   
   **After**
   https://github.com/apache/kafka/assets/97168911/8d926c4e-03e6-47e6-9367-cdd2ac89e3da;>
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16110) Implement consumer performance tests

2024-04-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16110:
--
Priority: Major  (was: Blocker)

> Implement consumer performance tests
> 
>
> Key: KAFKA-16110
> URL: https://issues.apache.org/jira/browse/KAFKA-16110
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, performance-benchmark
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-30 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1585308784


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,332 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "1"),
+@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "true")

Review Comment:
   ### server side
   
   If `NEW_GROUP_COORDINATOR_ENABLE_CONFIG` gets true, kraft broker will create 
`GroupCoordinator`
   
   
https://github.com/apache/kafka/blob/1e8415160f96eb579ceaa3f89b3362f1deeccf6b/core/src/main/scala/kafka/server/BrokerServer.scala#L556
   
   It enables to handle the requests used by `AsyncConsumer`. Also, 
`NEW_GROUP_COORDINATOR_ENABLE_CONFIG` do nothing to zk broker.
   
   ### client side
   
   `GROUP_PROTOCOL_CONFIG` is used to pick up the impl of `Consumer`
   
   
https://github.com/apache/kafka/blob/1e8415160f96eb579ceaa3f89b3362f1deeccf6b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java#L62
   
   > 

[PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-04-30 Thread via GitHub


gaurav-narula opened a new pull request, #15836:
URL: https://github.com/apache/kafka/pull/15836

   KIP-227 introduced in-memory caching of FetchSessions. Brokers with a large 
number of Fetch requests suffer from contention on trying to acquire a lock on 
FetchSessionCache.
   
   This change aims to reduce lock contention for FetchSessionCache by sharding 
the cache into multiple segments, each responsible for an equal range of 
sessionIds. Assuming Fetch requests have a uniform distribution of sessionIds, 
the probability of contention on a segment is reduced by a factor of the number 
of segments.
   
   We ensure backwards compatibility by ensuring total number of cache entries 
remain the same as configured and sessionIds are randomly allocated.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16637) KIP-848 does not work well

2024-04-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470
 ] 

Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 6:10 PM:
-

Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps!


was (Author: JIRAUSER300183):
Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> 

[jira] [Comment Edited] (KAFKA-16637) KIP-848 does not work well

2024-04-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470
 ] 

Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 6:10 PM:
-

Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps


was (Author: JIRAUSER300183):
Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

{quote}for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
topic1{quote}

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> 

[jira] [Commented] (KAFKA-16637) KIP-848 does not work well

2024-04-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470
 ] 

Lianet Magrans commented on KAFKA-16637:


Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

{quote}for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
topic1{quote}

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16557) Fix OffsetFetchRequestState.toString()

2024-04-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16557:
--
Priority: Minor  (was: Major)

> Fix OffsetFetchRequestState.toString()
> --
>
> Key: KAFKA-16557
> URL: https://issues.apache.org/jira/browse/KAFKA-16557
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, logging
> Fix For: 3.8.0
>
>
> The code incorrectly overrides the {{toString()}} method instead of 
> overriding {{{}toStringBase(){}}}. This affects debugging and troubleshooting 
> consumer issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16558) Implement HeartbeatRequestState.toStringBase()

2024-04-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16558:
--
Priority: Minor  (was: Major)

> Implement HeartbeatRequestState.toStringBase()
> --
>
> Key: KAFKA-16558
> URL: https://issues.apache.org/jira/browse/KAFKA-16558
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, logging
> Fix For: 3.8.0
>
>
> The inner class {{HeartbeatRequestState}} does not override the 
> {{toStringBase()}} method. This affects debugging and troubleshooting 
> consumer issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]

2024-04-30 Thread via GitHub


jeffkbkim commented on code in PR #15835:
URL: https://github.com/apache/kafka/pull/15835#discussion_r1585255667


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -461,26 +459,30 @@ public void testMetrics() throws Exception {
 @Test
 public void testRecordThreadIdleRatioTwoThreads() throws Exception {
 GroupCoordinatorRuntimeMetrics mockRuntimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+Time time = Time.SYSTEM;

Review Comment:
   I used system time here because MockTime does not simulate on a per-thread 
basis



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]

2024-04-30 Thread via GitHub


jeffkbkim opened a new pull request, #15835:
URL: https://github.com/apache/kafka/pull/15835

   This PR fixes the thread idle ratio. We take a similar approach to the kafka 
request handler idle ratio: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L108-L117
   
   Instead of calculating the actual ratio per thread, we record the time each 
thread stays idle while waiting for a new event, divided by the number of 
threads as an approximation.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-9401) High lock contention for kafka.server.FetchManager.newContext

2024-04-30 Thread Gaurav Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Narula reassigned KAFKA-9401:


Assignee: Gaurav Narula

> High lock contention for kafka.server.FetchManager.newContext
> -
>
> Key: KAFKA-9401
> URL: https://issues.apache.org/jira/browse/KAFKA-9401
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Lucas Bradstreet
>Assignee: Gaurav Narula
>Priority: Major
>
> kafka.server.FetchManager.newContext takes out what is essentially a global 
> fetch lock on kafka.server.FetchSessionCache, for updates to not only the 
> FetchSessionCache but the also update the fetch sessions stored with in it. 
> This causes a high amount of lock contention for fetches, as every fetch 
> request must go through this lock.
> I have taken an async-profiler lock profile on a high throughput cluster, and 
> I see around 25s of waiting on this lock for a sixty second profile.
> {noformat}
> *— 25818577497 ns (20.84%), 5805 samples
>  [ 0] kafka.server.FetchSessionCache
>  [ 1] kafka.server.FetchManager.newContext
>  [ 2] kafka.server.KafkaApis.handleFetchRequest
>  [ 3] kafka.server.KafkaApis.handle
>  [ 4] kafka.server.KafkaRequestHandler.run
>  [ 5] java.lang.Thread.run
>  {noformat}
> FetchSession.scala:
> {code:java}
>   cache.synchronized {
> cache.get(reqMetadata.sessionId) match {
>   case None => {
> debug(s"Session error for ${reqMetadata.sessionId}: no such session 
> ID found.")
> new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, 
> reqMetadata)
>   }
>   case Some(session) => session.synchronized {
> if (session.epoch != reqMetadata.epoch) {
>   debug(s"Session error for ${reqMetadata.sessionId}: expected epoch 
> " +
> s"${session.epoch}, but got ${reqMetadata.epoch} instead.");
>   new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, 
> reqMetadata)
> } else {
>   val (added, updated, removed) = session.update(fetchData, toForget, 
> reqMetadata)
>   if (session.isEmpty) {
> debug(s"Created a new sessionless FetchContext and closing 
> session id ${session.id}, " +
>   s"epoch ${session.epoch}: after removing 
> ${partitionsToLogString(removed)}, " +
>   s"there are no more partitions left.")
> cache.remove(session)
> new SessionlessFetchContext(fetchData)
>   } else {
> cache.touch(session, time.milliseconds())
> session.epoch = JFetchMetadata.nextEpoch(session.epoch)
> debug(s"Created a new incremental FetchContext for session id 
> ${session.id}, " +
>   s"epoch ${session.epoch}: added 
> ${partitionsToLogString(added)}, " +
>   s"updated ${partitionsToLogString(updated)}, " +
>   s"removed ${partitionsToLogString(removed)}")
> new IncrementalFetchContext(time, reqMetadata, session)
>   }
> }
>   }
> }
>   }
> {code}
> Contention has been made worse by the solution for "KAFKA-9137: Fix incorrect 
> FetchSessionCache eviction logic" 
> ([https://github.com/apache/kafka/pull/7640),] as the cache is correctly 
> touched now, whereas previously the touch was being skipped.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16027: MINOR Refactor MetadataTest#testUpdatePartitionLeadership [kafka]

2024-04-30 Thread via GitHub


Alexander-Aghili commented on PR #15055:
URL: https://github.com/apache/kafka/pull/15055#issuecomment-2086139400

   Okay I messed up the git a little here (still have much to learn), the 
current trunk does exist on my Kafka-16027 branch but I had to redo it force 
push which led to auto-closing this branch. Any recommendation on what do to 
next?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16027: MINOR Refactor MetadataTest#testUpdatePartitionLeadership [kafka]

2024-04-30 Thread via GitHub


Alexander-Aghili closed pull request #15055: KAFKA-16027: MINOR Refactor 
MetadataTest#testUpdatePartitionLeadership
URL: https://github.com/apache/kafka/pull/15055


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16000: Migrate MembershipManagerImplTest away from ConsumerTestBuilder [kafka]

2024-04-30 Thread via GitHub


kirktrue closed pull request #14950: KAFKA-16000: Migrate 
MembershipManagerImplTest away from ConsumerTestBuilder
URL: https://github.com/apache/kafka/pull/14950


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned

2024-04-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16623:
--
Description: 
When running system tests for the KafkaAsyncConsumer, we occasionally see this 
warning:

{noformat}
  File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
self.run()
  File "/usr/lib/python3.7/threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
 line 38, in _protected_worker
self._worker(idx, node)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
 line 304, in _worker
handler.handle_partitions_revoked(event, node, self.logger)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
 line 163, in handle_partitions_revoked
(tp, node.account.hostname)
AssertionError: Topic partition TopicPartition(topic='test_topic', partition=0) 
cannot be revoked from worker20 as it was not previously assigned to that 
consumer
{noformat}

In test_fencing_static_consumer, there are two sets of consumers that use group 
instance IDs: the initial set and the "conflict" set. It appears that one of 
the "conflicting" consumers hijacks the partition ownership from the 
coordinator's perspective when the initial consumer leaves the group.

  was:
When running system tests for the KafkaAsyncConsumer, we occasionally see this 
warning:

{noformat}
  File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
self.run()
  File "/usr/lib/python3.7/threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
 line 38, in _protected_worker
self._worker(idx, node)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
 line 304, in _worker
handler.handle_partitions_revoked(event, node, self.logger)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
 line 163, in handle_partitions_revoked
(tp, node.account.hostname)
AssertionError: Topic partition TopicPartition(topic='test_topic', partition=0) 
cannot be revoked from worker20 as it was not previously assigned to that 
consumer
{noformat}

In 


> KafkaAsyncConsumer system tests warn about revoking partitions that weren't 
> previously assigned
> ---
>
> Key: KAFKA-16623
> URL: https://issues.apache.org/jira/browse/KAFKA-16623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 3.8.0
>
>
> When running system tests for the KafkaAsyncConsumer, we occasionally see 
> this warning:
> {noformat}
>   File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
> self.run()
>   File "/usr/lib/python3.7/threading.py", line 865, in run
> self._target(*self._args, **self._kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
>  line 38, in _protected_worker
> self._worker(idx, node)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 304, in _worker
> handler.handle_partitions_revoked(event, node, self.logger)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 163, in handle_partitions_revoked
> (tp, node.account.hostname)
> AssertionError: Topic partition TopicPartition(topic='test_topic', 
> partition=0) cannot be revoked from worker20 as it was not previously 
> assigned to that consumer
> {noformat}
> In test_fencing_static_consumer, there are two sets of consumers that use 
> group instance IDs: the initial set and the "conflict" set. It appears that 
> one of the "conflicting" consumers hijacks the partition ownership from the 
> coordinator's perspective when the initial consumer leaves the group.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned

2024-04-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16623:
--
Description: 
When running system tests for the KafkaAsyncConsumer, we occasionally see this 
warning:

{noformat}
  File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
self.run()
  File "/usr/lib/python3.7/threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
 line 38, in _protected_worker
self._worker(idx, node)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
 line 304, in _worker
handler.handle_partitions_revoked(event, node, self.logger)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
 line 163, in handle_partitions_revoked
(tp, node.account.hostname)
AssertionError: Topic partition TopicPartition(topic='test_topic', partition=0) 
cannot be revoked from worker20 as it was not previously assigned to that 
consumer
{noformat}

In 

  was:
When running system tests for the KafkaAsyncConsumer, we occasionally see this 
warning:

{noformat}
  File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
self.run()
  File "/usr/lib/python3.7/threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
 line 38, in _protected_worker
self._worker(idx, node)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
 line 304, in _worker
handler.handle_partitions_revoked(event, node, self.logger)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
 line 163, in handle_partitions_revoked
(tp, node.account.hostname)
AssertionError: Topic partition TopicPartition(topic='test_topic', partition=0) 
cannot be revoked from worker20 as it was not previously assigned to that 
consumer
{noformat}

It is unclear what is causing this.


> KafkaAsyncConsumer system tests warn about revoking partitions that weren't 
> previously assigned
> ---
>
> Key: KAFKA-16623
> URL: https://issues.apache.org/jira/browse/KAFKA-16623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 3.8.0
>
>
> When running system tests for the KafkaAsyncConsumer, we occasionally see 
> this warning:
> {noformat}
>   File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
> self.run()
>   File "/usr/lib/python3.7/threading.py", line 865, in run
> self._target(*self._args, **self._kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
>  line 38, in _protected_worker
> self._worker(idx, node)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 304, in _worker
> handler.handle_partitions_revoked(event, node, self.logger)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 163, in handle_partitions_revoked
> (tp, node.account.hostname)
> AssertionError: Topic partition TopicPartition(topic='test_topic', 
> partition=0) cannot be revoked from worker20 as it was not previously 
> assigned to that consumer
> {noformat}
> In 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned

2024-04-30 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842453#comment-17842453
 ] 

Kirk True commented on KAFKA-16623:
---

If I add the following code to {{test_fencing_static_consumer}} before stopping 
the consumer, the test runs:

{code:python}
# Make sure the conflicting consumers are all dead, but then go 
ahead and stop them all to ensure
# that everything is cleanly stopped, otherwise we may get 
spurious errors
assert len(conflict_consumer.dead_nodes()) == 
len(conflict_consumer.nodes), "Conflicting consumers should all have received 
errors on startup and quit"
conflict_consumer.stop_all()
wait_until(lambda: len(conflict_consumer.dead_nodes()) == 
len(conflict_consumer.nodes),
   timeout_sec=self.session_timeout_sec+5,
   err_msg="Timed out waiting for the conflict consumer 
to shutdown")
{code}

 I'm still investigating, though, as I'm not sure if this is a "fix" or if it 
"masks" a real issue.

> KafkaAsyncConsumer system tests warn about revoking partitions that weren't 
> previously assigned
> ---
>
> Key: KAFKA-16623
> URL: https://issues.apache.org/jira/browse/KAFKA-16623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 3.8.0
>
>
> When running system tests for the KafkaAsyncConsumer, we occasionally see 
> this warning:
> {noformat}
>   File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
> self.run()
>   File "/usr/lib/python3.7/threading.py", line 865, in run
> self._target(*self._args, **self._kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
>  line 38, in _protected_worker
> self._worker(idx, node)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 304, in _worker
> handler.handle_partitions_revoked(event, node, self.logger)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 163, in handle_partitions_revoked
> (tp, node.account.hostname)
> AssertionError: Topic partition TopicPartition(topic='test_topic', 
> partition=0) cannot be revoked from worker20 as it was not previously 
> assigned to that consumer
> {noformat}
> It is unclear what is causing this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-30 Thread via GitHub


kirktrue commented on PR #15723:
URL: https://github.com/apache/kafka/pull/15723#issuecomment-2086027819

   Thanks everyone for the reviews and @lucasbru for the merge!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Replaced Utils.join() with JDK API. [kafka]

2024-04-30 Thread via GitHub


chiacyu commented on code in PR #15823:
URL: https://github.com/apache/kafka/pull/15823#discussion_r1585188342


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -584,27 +584,6 @@ public static String formatBytes(long bytes) {
 }
 }
 
-/**
- * Create a string representation of an array joined by the given separator
- * @param strs The array of items
- * @param separator The separator
- * @return The string representation.
- */
-public static  String join(T[] strs, String separator) {
-return join(Arrays.asList(strs), separator);
-}
-
-/**
- * Create a string representation of a collection joined by the given 
separator
- * @param collection The list of items
- * @param separator The separator
- * @return The string representation.
- */
-public static  String join(Collection collection, String separator) {
-Objects.requireNonNull(collection);
-return mkString(collection.stream(), "", "", separator);
-}
-
 /**
  * Create a string representation of a stream surrounded by `begin` and 
`end` and joined by `separator`.
  *

Review Comment:
   Thanks for the reply. Suggestions applied. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-04-30 Thread via GitHub


C0urante commented on PR #14309:
URL: https://github.com/apache/kafka/pull/14309#issuecomment-2085912341

   @gharris1727 I've resolved the merge conflicts again; can you please take a 
look when you get a chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16647: Remove setMetadataDirectory from BrokerNode/ControllerNode [kafka]

2024-04-30 Thread via GitHub


brandboat opened a new pull request, #15833:
URL: https://github.com/apache/kafka/pull/15833

   related to https://issues.apache.org/jira/browse/KAFKA-16647
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-30 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1585160959


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+// The subscription metadata is updated in two cases:
+// 1) The member has updated its subscriptions;
+// 2) The refresh deadline has been reached.

Review Comment:
   I thought you wanted it above the if statement in the last comment haha, I 
think it was there originally
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-30 Thread via GitHub


dajac commented on PR #15798:
URL: https://github.com/apache/kafka/pull/15798#issuecomment-2085866313

   @dongnuo123 Be aware of https://github.com/apache/kafka/pull/15785. The PR 
changes code that you have refactored or reused in this one. We will need to 
adapt when we merge it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-30 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1585158392


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+// The subscription metadata is updated in two cases:
+// 1) The member has updated its subscriptions;
+// 2) The refresh deadline has been reached.
+Map subscribedTopicNamesMap = 
group.subscribedTopicNames();

Review Comment:
   we can't cause the list is also named subscribedTopicNames right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-30 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1585155736


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,55 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * If all the members are subscribed to the same set of topics, the type 
is homogeneous.
+ * Otherwise, it is heterogeneous.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;

Review Comment:
   nit: Let's add an empty line before this one in order to match the style in 
the file.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+// The subscription metadata is updated in two cases:
+// 1) The member has updated its subscriptions;
+// 2) The refresh deadline has been reached.
+Map subscribedTopicNamesMap = 
group.subscribedTopicNames();

Review Comment:
   nit: Should we use `subscribedTopicNames` too?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+// The subscription metadata is updated in two cases:
+// 1) The member has updated its subscriptions;
+// 2) The refresh deadline has been reached.

Review Comment:
   nit: Could we move it to right before `subscribedTopicNamesMap = 
group.computeSubscribedTopicNames(member, updatedMember);`?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,55 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * If all the members are subscribed to the same set of topics, the type 
is homogeneous.
+ * Otherwise, it is heterogeneous.

Review Comment:
   nit: We could remove this as it is already in the `@return`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-30 Thread via GitHub


philipnee commented on PR #15723:
URL: https://github.com/apache/kafka/pull/15723#issuecomment-2085841809

   Hey sorry for the delay, the changes look good to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585152310


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection 
topics, Optional T processBackgroundEvents(EventProcessor eventProcessor,
+ T processBackgroundEvents(EventProcessor 
eventProcessor,

Review Comment:
   Not introduced by this PR, but reviewing this processing I don't quite see 
the value in all [these 
lines](https://github.com/apache/kafka/blob/097522abd6b51bca2407ea0de7009ed6a2d970b4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1910-L1915)
 , that are even repeated further down, just for a log, when in practice this 
are both the happy path that will have 
[this](https://github.com/apache/kafka/blob/097522abd6b51bca2407ea0de7009ed6a2d970b4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1506)
 log from the unsubscribe. A one liner with `return 
ConsumerUtils.getResult(future);` would achieve the same and make the func much 
simpler. 
   (even if we end up using this from a func other than the unsubscribe, seems 
an overkill to have all this code for something we don't need now, or know if 
we we'll need some day)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]

2024-04-30 Thread via GitHub


dajac merged PR #15818:
URL: https://github.com/apache/kafka/pull/15818


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585035057


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1853,6 +1824,40 @@ private void subscribeInternal(Collection 
topics, Optional processor) {

Review Comment:
   what about renaming this to be explicit about what we process here? It gets 
confusing given that at this consumer level we're dealing with app events and 
background events. 
   
   `processBackgroundEvents` feels pretty clear, and I know there is already 
another one called liked that, but the other one is more about 
`awaitFutureProcessingBackgroundEvents` , because it actually blocks for a 
time, only used from the unsubscribe, so maybe rename here and there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585091606


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection 
topics, Optional T processBackgroundEvents(EventProcessor eventProcessor,
+ T processBackgroundEvents(EventProcessor 
eventProcessor,

Review Comment:
   Also, regarding:
   
   > as part of this rebalancing work, the {@link 
ConsumerRebalanceListener#onPartitionsRevoked(Collection)} callback needs to be 
invoked
   
   it does not need to if the consumer unsubscribing does not own any 
partition, so just for accuracy in the example I would suggest to extend it 
with "...needs to be invoked for the partitions the consumer owns"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585083395


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection 
topics, Optional T processBackgroundEvents(EventProcessor eventProcessor,
+ T processBackgroundEvents(EventProcessor 
eventProcessor,

Review Comment:
   Regarding the func doc, typo and clarification:
   
   > When the application thread sees ..., it is processed, and **then a ...is 
then** enqueued by the application thread **on the background event queue**
   
   The app thread enqueues the event in an **application event queue** (that 
the background thread consumes), right? In the doc we ended up mentioning the 
background and app thread both adding to the background event queue. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585035057


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1853,6 +1824,40 @@ private void subscribeInternal(Collection 
topics, Optional processor) {

Review Comment:
   what about renaming this to be explicit about what we process here? It gets 
confusing given that at this consumer level we're dealing with app events and 
background events



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-04-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842423#comment-17842423
 ] 

Matthias J. Sax commented on KAFKA-16382:
-

Not yet from our side... Working on other things atm. Not sure when we will be 
able to pick it up, or if anybody from the community wants to take it.

> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* 2 messages "A1:anull, 
> A1:ab"
>  # Expected output *3 messages* "A1:anull, A1:ab, {*}A1:{*}"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585004533


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+/**
+ * The {@code CompletableEventReaper} is responsible for tracking any {@link 
CompletableEvent}s that were processed,

Review Comment:
   responsible for events that "are being processed" right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585003046


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+/**
+ * The {@code CompletableEventReaper} is responsible for tracking any {@link 
CompletableEvent}s that were processed,
+ * making sure to reap them if they complete normally or pass their deadline. 
This is done so that we enforce an upper
+ * bound on the amount of time the event logic will execute.
+ */
+public class CompletableEventReaper> {
+
+private final Logger log;
+
+/**
+ * List of tracked events that we are candidates to expire or cancel when 
reviewed.
+ */
+private final List tracked;
+
+public CompletableEventReaper(LogContext logContext) {
+this.log = logContext.logger(CompletableEventReaper.class);
+this.tracked = new ArrayList<>();
+}
+
+/**
+ * Adds a new {@link CompletableEvent event} to track for later 
completion/expiration.
+ *
+ * @param event Event to track
+ */
+public void add(T event) {
+tracked.add(Objects.requireNonNull(event, "Event to track must be 
non-null"));
+}
+
+/**
+ * This method "completes" any {@link CompletableEvent}s that have either 
expired or completed normally. So this
+ * is a two-step process:
+ *
+ * 
+ * 
+ * For each tracked event which has exceeded its {@link 
CompletableEvent#deadlineMs() deadline}, an
+ * instance of {@link TimeoutException} is created and passed to
+ * {@link CompletableFuture#completeExceptionally(Throwable)}.
+ * 
+ * 
+ * For each tracked event of which its {@link 
CompletableEvent#future() future} is already in the
+ * {@link CompletableFuture#isDone() done} state, it will be 
removed from the list of tracked events.
+ * 
+ * 
+ *
+ * 
+ *
+ * This method should be called at regular intervals, based upon the needs 
of the resource that owns the reaper.
+ *
+ * @param currentTimeMs Current time with which to compare 
against the
+ *  {@link CompletableEvent#deadlineMs() 
expiration time}
+ */
+public void reapExpiredAndCompleted(long currentTimeMs) {
+log.trace("Reaping expired events");
+
+Consumer> timeoutEvent = e -> {
+TimeoutException error = new TimeoutException(String.format("%s 
could not be completed within its timeout", e.getClass().getSimpleName()));
+long pastDueMs = currentTimeMs - e.deadlineMs();
+log.debug("Completing event {} exceptionally since it expired {} 
ms ago", e, pastDueMs);
+CompletableFuture f = e.future();
+f.completeExceptionally(error);
+};
+
+// First, complete (exceptionally) any events that have passed their 
deadline AND aren't already complete.
+tracked.stream()
+.filter(e -> !e.future().isDone())
+.filter(e -> currentTimeMs > e.deadlineMs())
+.forEach(timeoutEvent);
+// Second, remove any events that are already complete, just to make 
sure we don't hold references. This will
+// include any events that finished successfully as well as any events 
we just completed exceptionally above.
+tracked.removeIf(e -> e.future().isDone());

Review Comment:
   Couldn't we make that the same add operation removes the event whenComplete? 
Seems tighter that the same operation that adds the event ensures 

Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-30 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585003046


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+/**
+ * The {@code CompletableEventReaper} is responsible for tracking any {@link 
CompletableEvent}s that were processed,
+ * making sure to reap them if they complete normally or pass their deadline. 
This is done so that we enforce an upper
+ * bound on the amount of time the event logic will execute.
+ */
+public class CompletableEventReaper> {
+
+private final Logger log;
+
+/**
+ * List of tracked events that we are candidates to expire or cancel when 
reviewed.
+ */
+private final List tracked;
+
+public CompletableEventReaper(LogContext logContext) {
+this.log = logContext.logger(CompletableEventReaper.class);
+this.tracked = new ArrayList<>();
+}
+
+/**
+ * Adds a new {@link CompletableEvent event} to track for later 
completion/expiration.
+ *
+ * @param event Event to track
+ */
+public void add(T event) {
+tracked.add(Objects.requireNonNull(event, "Event to track must be 
non-null"));
+}
+
+/**
+ * This method "completes" any {@link CompletableEvent}s that have either 
expired or completed normally. So this
+ * is a two-step process:
+ *
+ * 
+ * 
+ * For each tracked event which has exceeded its {@link 
CompletableEvent#deadlineMs() deadline}, an
+ * instance of {@link TimeoutException} is created and passed to
+ * {@link CompletableFuture#completeExceptionally(Throwable)}.
+ * 
+ * 
+ * For each tracked event of which its {@link 
CompletableEvent#future() future} is already in the
+ * {@link CompletableFuture#isDone() done} state, it will be 
removed from the list of tracked events.
+ * 
+ * 
+ *
+ * 
+ *
+ * This method should be called at regular intervals, based upon the needs 
of the resource that owns the reaper.
+ *
+ * @param currentTimeMs Current time with which to compare 
against the
+ *  {@link CompletableEvent#deadlineMs() 
expiration time}
+ */
+public void reapExpiredAndCompleted(long currentTimeMs) {
+log.trace("Reaping expired events");
+
+Consumer> timeoutEvent = e -> {
+TimeoutException error = new TimeoutException(String.format("%s 
could not be completed within its timeout", e.getClass().getSimpleName()));
+long pastDueMs = currentTimeMs - e.deadlineMs();
+log.debug("Completing event {} exceptionally since it expired {} 
ms ago", e, pastDueMs);
+CompletableFuture f = e.future();
+f.completeExceptionally(error);
+};
+
+// First, complete (exceptionally) any events that have passed their 
deadline AND aren't already complete.
+tracked.stream()
+.filter(e -> !e.future().isDone())
+.filter(e -> currentTimeMs > e.deadlineMs())
+.forEach(timeoutEvent);
+// Second, remove any events that are already complete, just to make 
sure we don't hold references. This will
+// include any events that finished successfully as well as any events 
we just completed exceptionally above.
+tracked.removeIf(e -> e.future().isDone());

Review Comment:
   Couldn't we make that the same add operation removes the event whenComplete? 
Seems tighter that the same operation that adds the event ensures 

  1   2   >