[jira] [Updated] (KAFKA-16251) Fenced member should not send heartbeats while waiting for onPartitionsLost to complete

2024-02-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16251:
--
Priority: Critical  (was: Major)

> Fenced member should not send heartbeats while waiting for onPartitionsLost 
> to complete
> ---
>
> Key: KAFKA-16251
> URL: https://issues.apache.org/jira/browse/KAFKA-16251
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support
>
> When a member gets fenced, it transitions to FENCED state and triggers the 
> onPartitionsLost callback to release it assignment. Members should stop 
> sending heartbeats while FENCED, and resume sending it only after completing 
> the callback, when it transitions to JOINING.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2024-02-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15561:
--
Priority: Major  (was: Critical)

> Client support for new SubscriptionPattern based subscription
> -
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-client-support, regex
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15538) Client support for java regex based subscription

2024-02-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15538:
--
Priority: Blocker  (was: Critical)

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Blocker
>  Labels: kip-848-client-support, newbie, regex
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.
> As part of this task, we should re-enable all integration tests defined in 
> the PlainTextAsyncConsumer that relate to subscription with pattern and that 
> are currently disabled for the new consumer + new protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16286; Notify listener of latest leader and epoch [kafka]

2024-02-20 Thread via GitHub


jsancio commented on code in PR #15397:
URL: https://github.com/apache/kafka/pull/15397#discussion_r1496389863


##
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##
@@ -2958,6 +2958,59 @@ public void 
testHandleCommitCallbackFiresInCandidateState() throws Exception {
 assertEquals(OptionalInt.empty(), 
secondListener.currentClaimedEpoch());
 }
 
+@Test
+public void testHandleLeaderChangeFiresAfterUnattachedRegistration() 
throws Exception {
+int localId = 0;
+int otherNodeId = 1;
+int epoch = 7;
+Set voters = Utils.mkSet(localId, otherNodeId);
+
+RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.withUnknownLeader(epoch)
+.build();
+
+// Register another listener and verify that it is notified of latest 
epoch
+RaftClientTestContext.MockListener secondListener = new 
RaftClientTestContext.MockListener(
+OptionalInt.of(localId)
+);
+context.client.register(secondListener);
+context.client.poll();
+
+// Expected leader change notification
+LeaderAndEpoch expectedLeaderAndEpoch = new 
LeaderAndEpoch(OptionalInt.empty(), epoch);
+assertEquals(expectedLeaderAndEpoch, 
secondListener.currentLeaderAndEpoch());
+
+// Transition to follower and observer leader change
+context.deliverRequest(context.beginEpochRequest(epoch, otherNodeId));
+context.pollUntilResponse();
+
+// Expected leader change notification
+expectedLeaderAndEpoch = new 
LeaderAndEpoch(OptionalInt.of(otherNodeId), epoch);
+assertEquals(expectedLeaderAndEpoch, 
secondListener.currentLeaderAndEpoch());
+}
+
+@Test
+public void testHandleLeaderChangeFiresAfterFollowerRegistration() throws 
Exception {
+int localId = 0;
+int otherNodeId = 1;
+int epoch = 7;
+Set voters = Utils.mkSet(localId, otherNodeId);
+
+RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.withElectedLeader(epoch, otherNodeId)
+.build();
+
+// Register another listener and verify that it is notified of latest 
epoch
+RaftClientTestContext.MockListener secondListener = new 
RaftClientTestContext.MockListener(
+OptionalInt.of(localId)
+);
+context.client.register(secondListener);
+context.client.poll();
+
+LeaderAndEpoch expectedLeaderAndEpoch = new 
LeaderAndEpoch(OptionalInt.of(otherNodeId), epoch);
+assertEquals(expectedLeaderAndEpoch, 
secondListener.currentLeaderAndEpoch());
+}

Review Comment:
   Both of these tests fail against trunk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16286; Notify listener of latest leader and epoch [kafka]

2024-02-20 Thread via GitHub


jsancio opened a new pull request, #15397:
URL: https://github.com/apache/kafka/pull/15397

   KRaft was only notifying listeners of the latest leader and epoch when the 
replica transition to a new state. This can result in the listener never 
getting notified if the registration happened after it had become a follower.
   
   This problem doesn't exists for the active leader because the KRaft 
implementation attempts to notified the listener of the latest leader and epoch 
when the replica is the active leader.
   
   This issue is fixed by notifying the listeners of the latest leader and 
epoch after processing the listener registration request.
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove unnecessary logging [kafka]

2024-02-20 Thread via GitHub


mjsax commented on code in PR #15396:
URL: https://github.com/apache/kafka/pull/15396#discussion_r1496379447


##
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java:
##
@@ -264,7 +264,6 @@ public void put(final Bytes key,
 final S segment = segments.getOrCreateSegmentIfLive(segmentId, 
context, observedStreamTime);
 if (segment == null) {
 expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
-LOG.warn("Skipping record for expired segment.");

Review Comment:
   I don't see much value? We don't log any detailed information... -- the log 
line only gives you a processing time timestamp but nothing more...
   
   If we would want to make it useful, we could add offset/timestamp metadata 
(but not log key/value as we never log data...) -- for this case, it might be 
useful to keep at DEBUG or TRACE.
   
   Let's hear from others about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove unnecessary logging [kafka]

2024-02-20 Thread via GitHub


kpatelatwork commented on code in PR #15396:
URL: https://github.com/apache/kafka/pull/15396#discussion_r1496373291


##
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java:
##
@@ -264,7 +264,6 @@ public void put(final Bytes key,
 final S segment = segments.getOrCreateSegmentIfLive(segmentId, 
context, observedStreamTime);
 if (segment == null) {
 expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
-LOG.warn("Skipping record for expired segment.");

Review Comment:
   is there a value in converting to debug instead of removing it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-20 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1496360067


##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -81,11 +87,16 @@ public Set filteredProducerIds() {
 return filteredProducerIds;
 }
 
+public long getDurationFilter() {
+return durationFilter;
+}
+
 @Override
 public String toString() {
 return "ListTransactionsOptions(" +
 "filteredStates=" + filteredStates +
 ", filteredProducerIds=" + filteredProducerIds +
+", durationFilter=" + durationFilter +

Review Comment:
   updated in 
[8bfc13a](https://github.com/apache/kafka/pull/15384/commits/8bfc13ae7551ecbc6df2502a4904892a4d1402b7)



##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -81,11 +87,16 @@ public Set filteredProducerIds() {
 return filteredProducerIds;
 }
 
+public long getDurationFilter() {

Review Comment:
   updated in 
[8bfc13a](https://github.com/apache/kafka/pull/15384/commits/8bfc13ae7551ecbc6df2502a4904892a4d1402b7)



##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -61,6 +62,11 @@ public ListTransactionsOptions 
filterProducerIds(Collection producerIdFilt
 return this;
 }
 
+public ListTransactionsOptions durationFilter(long durationMs) {

Review Comment:
   updated in 
[8bfc13a](https://github.com/apache/kafka/pull/15384/commits/8bfc13ae7551ecbc6df2502a4904892a4d1402b7)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-20 Thread via GitHub


CalvinConfluent commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1496355495


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2129,63 +2167,183 @@ private Map> 
handleDescribeTopicsByNames(f
 }
 }
 final long now = time.milliseconds();
-Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-new LeastLoadedNodeProvider()) {
 
-private boolean supportsDisablingTopicCreation = true;
+if (options.useDescribeTopicsApi()) {
+RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {

Review Comment:
   Thanks for the advice. We still want to avoid loading a huge topic in the 
memory but the current implementation does not really achieve that. I will make 
a change to address this.
   Maybe it is a bit over-engineering to have more than 1 ongoing call in the 
pipeline. I will keep at most one call in the next iteration commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16206 Fix unnecessary topic config deletion during ZK migration [kafka]

2024-02-20 Thread via GitHub


ahuang98 commented on PR #14206:
URL: https://github.com/apache/kafka/pull/14206#issuecomment-1954880081

   Thanks @mimaison, looks like this is because I split out the TopicsImageTest 
changes into a separate PR. The migration tests depend on the `DELTA1_RECORDS` 
defined there so I moved over changes from TopicsImageTest as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: remove unnecessary logging [kafka]

2024-02-20 Thread via GitHub


mjsax opened a new pull request, #15396:
URL: https://github.com/apache/kafka/pull/15396

   We already record dropping record via metrics and logging at WARN level is 
too noise. This PR removes the unnecessary logging.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-20 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1496339739


##
tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java:
##
@@ -187,14 +187,25 @@ private void testDescribeProducers(
 assertEquals(expectedRows, new HashSet<>(table.subList(1, 
table.size(;
 }
 
-@Test
-public void testListTransactions() throws Exception {
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testListTransactions(boolean hasDurationFilter) throws 
Exception {
 String[] args = new String[] {
 "--bootstrap-server",
 "localhost:9092",
 "list"
 };
 
+if (hasDurationFilter) {
+args = new String[] {
+"--bootstrap-server",
+"localhost:9092",
+"list",
+"--duration-filter",
+Long.toString(Long.MAX_VALUE)

Review Comment:
   This just tests the cli cmd parsing parameters correctly. It will not call 
actual list transaction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16286) KRaft doesn't always notify listener of latest leader

2024-02-20 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-16286:
---
Description: 
If a listener registers with RaftClient after the KRaft replica has transition 
to follower it will not get notified of the current leader until it has 
transitioned to another state.

In a stable cluster the listeners that are not the leader (inactive controllers 
and brokers) will only get notified when then leader changes.

  was:
If a listener register with RaftClient after the KRaft replica has transition 
to follower it will not get notified of the current leader until it has 
transitioned to another state.

In a stable cluster the listeners that are not the active leader (inactive 
controllers and brokers) will only get notified when then leader changes.


> KRaft doesn't always notify listener of latest leader
> -
>
> Key: KAFKA-16286
> URL: https://issues.apache.org/jira/browse/KAFKA-16286
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
>
> If a listener registers with RaftClient after the KRaft replica has 
> transition to follower it will not get notified of the current leader until 
> it has transitioned to another state.
> In a stable cluster the listeners that are not the leader (inactive 
> controllers and brokers) will only get notified when then leader changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16286) KRaft doesn't always notify listener of latest leader

2024-02-20 Thread Jira
José Armando García Sancio created KAFKA-16286:
--

 Summary: KRaft doesn't always notify listener of latest leader
 Key: KAFKA-16286
 URL: https://issues.apache.org/jira/browse/KAFKA-16286
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio


If a listener register with RaftClient after the KRaft replica has transition 
to follower it will not get notified of the current leader until it has 
transitioned to another state.

In a stable cluster the listeners that are not the active leader (inactive 
controllers and brokers) will only get notified when then leader changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-20 Thread via GitHub


artemlivshits commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1495047191


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java:
##
@@ -36,28 +38,38 @@
 public class DescribeTopicsResult {
 private final Map> topicIdFutures;
 private final Map> nameFutures;
+private final Iterator>> 
nameFuturesIterator;
 
 @Deprecated
 protected DescribeTopicsResult(Map> 
futures) {
-this(null, futures);
+this(null, futures, null);
 }
 
 // VisibleForTesting
-protected DescribeTopicsResult(Map> 
topicIdFutures, Map> nameFutures) {
-if (topicIdFutures != null && nameFutures != null)
-throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be specified.");
-if (topicIdFutures == null && nameFutures == null)
-throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be null.");
+protected DescribeTopicsResult(
+Map> topicIdFutures,
+Map> nameFutures,
+Iterator>> 
nameFuturesIterator
+) {
+if (topicIdFutures != null && nameFutures != null && 
nameFuturesIterator != null)
+throw new IllegalArgumentException("topicIdFutures and nameFutures 
and nameFutureIterator cannot both be specified.");

Review Comment:
   nit: using "both" implies that there are 2 things, we now have 3.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2129,63 +2167,183 @@ private Map> 
handleDescribeTopicsByNames(f
 }
 }
 final long now = time.milliseconds();
-Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-new LeastLoadedNodeProvider()) {
 
-private boolean supportsDisablingTopicCreation = true;
+if (options.useDescribeTopicsApi()) {
+RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {

Review Comment:
   Didn't quite get how the recurring call framework enables for lazy 
generation?  The RecurringCall.run() just runs the whole thing in one go.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2129,63 +2167,183 @@ private Map> 
handleDescribeTopicsByNames(f
 }
 }
 final long now = time.milliseconds();
-Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-new LeastLoadedNodeProvider()) {
 
-private boolean supportsDisablingTopicCreation = true;
+if (options.useDescribeTopicsApi()) {
+RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {

Review Comment:
   Looking at the current logic I cannot make sense out of it.  For some reason 
we get all topic names that fit under the batch size and then get all 
partitions for those topics and put them in memory.
   
   Why did we bother with all paging complexity if all we do is to get all 
partitions for 2000 topics in memory?  Can we just iterate topic by topic (as 
it seems that we get the list of all topics anyway) and just get partitions for 
each one?
   
   With the paging, I would've expected that we did something like this:
   
   1. start iteration -- issue one call, once the first call is completed maybe 
issue a few more (say 3) calls ahead for pipelining.
   2. next  -- iterate over current batch.
   3. once the current batch is exhausted, switch to the next batch, pipeline 
one more call.
   
   This way if the caller finishes iteration early or if iteration is slow, we 
won't keep piling results in memory.



##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) 
throws ExecutionException, I
 } else {
 ensureTopicExists(topics, opts.topic().orElse(""), 
!opts.ifExists());
 }
-List 
topicDescriptions = new ArrayList<>();
 
 if (!topicIds.isEmpty()) {
 Map 
descTopics =
 
adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get();
-topicDescriptions = new ArrayList<>(descTopics.values());
+describeTopicsFollowUp(new ArrayList<>(descTopics.values()), 
opts);
+return;
 }
 
 if (!topics.isEmpty()) {
-Map 
descTopics =
-
adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get();
-topicDescriptions = new ArrayList<>(descTopics.values());
+final int partitionSizeLimit = 
opts.partitionSizeLimitPerResponse().orElse(2000);
+try {
+Iterator>> 

[jira] [Assigned] (KAFKA-14747) FK join should record discarded subscription responses

2024-02-20 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari reassigned KAFKA-14747:
---

Assignee: Ayoub Omari  (was: Koma Zhang)

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Ayoub Omari
>Priority: Minor
>  Labels: beginner, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-14747: record discarded FK join subscription responses [kafka]

2024-02-20 Thread via GitHub


AyoubOm opened a new pull request, #15395:
URL: https://github.com/apache/kafka/pull/15395

   *As described in KAFKA-14747, we are not recording discarded FK join 
responses in case of receiving a join response for an old record (whose hash 
value has changed in the left table). This PR adds the record to dropped 
records sensor*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios

2024-02-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16111:
-

Assignee: Lucas Brutschy

> Implement tests for tricky rebalance callback scenarios
> ---
>
> Key: KAFKA-16111
> URL: https://issues.apache.org/jira/browse/KAFKA-16111
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Lucas Brutschy
>Priority: Blocker
>  Labels: callback, consumer-threading-refactor, integration-tests
> Fix For: 3.8.0
>
>
> There is justified concern that the new threading model may not play well 
> with "tricky" {{ConsumerRebalanceListener}} callbacks. We need to provide 
> some assurance that it will support complicated patterns.
>  # Design and implement test scenarios
>  # Update and document any design changes with the callback sub-system where 
> needed
>  # Provide fix(es) to the {{AsyncKafkaConsumer}} implementation to abide by 
> said design



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16285) Make group metadata available when a new assignment is set in async Kafka consumer

2024-02-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16285:
-

Assignee: Bruno Cadonna

> Make group metadata available when a new assignment is set in async Kafka 
> consumer
> --
>
> Key: KAFKA-16285
> URL: https://issues.apache.org/jira/browse/KAFKA-16285
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: kip-848-client-support
>
> Currently, the new async Kafka consumer sends an event from the background 
> thread to the application thread when the group metadata is updated. Group 
> metadata is updated when the background thread receives a new assignment. 
> More specifically, the member epoch is updated each time a new assignment is 
> received and and the member ID is updated with the first assignment. 
> In contrast to the group metadata update, the assignment is directly set in 
> the subscription without sending an update event from the background thread 
> to the application thread. That means that there is a delay between the 
> application thread being aware of the update to the assignment and the 
> application thread being aware of the update to the group metadata. This 
> behavior differs with respect to the legacy consumer were the assignment and 
> the group metadata is updated at the same time.
> We should make the update to the group metadata available to the application 
> thread when the update to the assignment is made available to the application 
> thread so that assignment an group metadata are in sync.
> For example, {{producer.sendOffsetsToTransaction(offsetsToCommit, 
> groupMetadata);}} benefits from this improvement because if the offsets to 
> commit are consistent with the current assignment also the group metadata 
> would be. Currently, that is not guaranteed. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses

2024-02-20 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818919#comment-17818919
 ] 

Ayoub Omari commented on KAFKA-14747:
-

Picking this up since [~kma] didn't respond :)

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Koma Zhang
>Priority: Minor
>  Labels: beginner, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-20 Thread via GitHub


lianetm commented on PR #15357:
URL: https://github.com/apache/kafka/pull/15357#issuecomment-1954655740

   Thanks for the comments @lucasbru , all addressed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16194: Do not return records from poll if group metadata unknown [kafka]

2024-02-20 Thread via GitHub


cadonna commented on code in PR #15369:
URL: https://github.com/apache/kafka/pull/15369#discussion_r1496180609


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -727,6 +732,17 @@ public ConsumerRecords poll(final Duration timeout) {
 }
 }
 
+private boolean isGenerationKnown() {

Review Comment:
   Probably, I can do something like
   ```java
   if (subscriptions.hasAutoAssignedPartitions()) {
   return groupMetadata.filter(g -> g.generationId() != 
JoinGroupRequest.UNKNOWN_GENERATION_ID).isPresent(); 
   }
   return true;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-20 Thread via GitHub


lianetm commented on code in PR #15357:
URL: https://github.com/apache/kafka/pull/15357#discussion_r1496179477


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -1147,15 +1202,15 @@ List drain(final 
long currentTimeMs) {
  * futures with a TimeoutException.
  */
 private void failAndRemoveExpiredCommitRequests(final long 
currentTimeMs) {
-unsentOffsetCommits.removeIf(req -> 
req.maybeExpire(currentTimeMs));
+unsentOffsetCommits.forEach(req -> req.maybeExpire(currentTimeMs));

Review Comment:
   Definitely, good catch, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16194: Do not return records from poll if group metadata unknown [kafka]

2024-02-20 Thread via GitHub


cadonna commented on code in PR #15369:
URL: https://github.com/apache/kafka/pull/15369#discussion_r1496167714


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -727,6 +732,17 @@ public ConsumerRecords poll(final Duration timeout) {
 }
 }
 
+private boolean isGenerationKnown() {

Review Comment:
   I am not sure this is correct, because I can use user assigned partitions 
but still use a group ID to manage the offsets broker-side, if I remember 
correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16194: Do not return records from poll if group metadata unknown [kafka]

2024-02-20 Thread via GitHub


cadonna commented on code in PR #15369:
URL: https://github.com/apache/kafka/pull/15369#discussion_r1496159485


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -708,14 +709,18 @@ public ConsumerRecords poll(final Duration timeout) 
{
 wakeupTrigger.maybeTriggerWakeup();
 
 updateAssignmentMetadataIfNeeded(timer);
-final Fetch fetch = pollForFetches(timer);
-if (!fetch.isEmpty()) {
-if (fetch.records().isEmpty()) {
-log.trace("Returning empty records from `poll()` "
+if (isGenerationKnown()) {

Review Comment:
   The busy loop should not be too bad since the group metadata update event is 
added to the queue when also the new assignment is set in the membership 
manager.
   
   Moreover, this is kind of a temporary solution until 
https://issues.apache.org/jira/browse/KAFKA-16285 is done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16194: Do not return records from poll if group metadata unknown [kafka]

2024-02-20 Thread via GitHub


cadonna commented on code in PR #15369:
URL: https://github.com/apache/kafka/pull/15369#discussion_r1496159485


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -708,14 +709,18 @@ public ConsumerRecords poll(final Duration timeout) 
{
 wakeupTrigger.maybeTriggerWakeup();
 
 updateAssignmentMetadataIfNeeded(timer);
-final Fetch fetch = pollForFetches(timer);
-if (!fetch.isEmpty()) {
-if (fetch.records().isEmpty()) {
-log.trace("Returning empty records from `poll()` "
+if (isGenerationKnown()) {

Review Comment:
   The busy loop should not be too bad since the group metadata update event is 
added to the queue when also the new assignment is set in the membership 
manager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh

2024-02-20 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818903#comment-17818903
 ] 

Justine Olshan commented on KAFKA-16282:


I am also happy to review the KIP and PRs :) 

> Allow to get last stable offset (LSO) in kafka-get-offsets.sh
> -
>
> Key: KAFKA-16282
> URL: https://issues.apache.org/jira/browse/KAFKA-16282
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Ahmed Sobeh
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> Currently, when using `kafka-get-offsets.sh` to get the offset by time, we 
> have these choices:
> {code:java}
> --time  /  timestamp of the offsets before 
> that. 
>   -1 or latest /   [Note: No offset is returned, if 
> the
>   -2 or earliest / timestamp greater than recently
>   -3 or max-timestamp /committed record timestamp is
>   -4 or earliest-local /   given.] (default: latest)
>   -5 or latest-tiered  
> {code}
> For the "latest" option, it'll always return the "high watermark" because we 
> always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It 
> would be good if the command can support to get the last stable offset (LSO) 
> for transaction support. That is, sending the option with 
> *IsolationLevel.READ_COMMITTED*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-20 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1496132259


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 
   def handleListTransactions(
 filteredProducerIds: Set[Long],
-filteredStates: Set[String]
+filteredStates: Set[String],
+durationFilter: Long = -1

Review Comment:
   Im not sure what he is supposed to change  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-20 Thread via GitHub


lianetm commented on code in PR #15357:
URL: https://github.com/apache/kafka/pull/15357#discussion_r1496101641


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -204,126 +205,315 @@ private static long findMinTime(final Collection request
 }
 
 /**
- * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
- * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
- * request if there is no other commit request already in-flight, and if 
the commit interval
- * has elapsed.
+ * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+ * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+ * offsets to commit, no request will be generated and a completed future 
will be returned.
  *
- * @param offsets   Offsets to commit
- * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
- *  fails with a retriable error. If not present, 
the request will be
- *  sent but not retried.
- * @param checkInterval True if the auto-commit interval expiration 
should be checked for
- *  sending a request. If true, the request will 
be sent only if the
- *  auto-commit interval has expired. Pass false to
- *  send the auto-commit request regardless of the 
interval (ex.
- *  auto-commit before rebalance).
- * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
- *  {@link Errors#STALE_MEMBER_EPOCH}.
- * @return Future that will complete when a response is received for the 
request, or a
- * completed future if no request is generated.
+ * @param requestState Commit request
+ * @return Future containing the offsets that were committed, or an error 
if the request
+ * failed.
  */
-private CompletableFuture maybeAutoCommit(final Map offsets,
-final Optional 
expirationTimeMs,
-boolean checkInterval,
-boolean retryOnStaleEpoch) 
{
-if (!autoCommitEnabled()) {
-log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-return CompletableFuture.completedFuture(null);
-}
-
+private CompletableFuture> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
 AutoCommitState autocommit = autoCommitState.get();
-if (checkInterval && !autocommit.shouldAutoCommit()) {
-return CompletableFuture.completedFuture(null);
+CompletableFuture> result;
+if (requestState.offsets.isEmpty()) {
+result = CompletableFuture.completedFuture(Collections.emptyMap());
+} else {
+autocommit.setInflightCommitStatus(true);
+OffsetCommitRequestState request = 
pendingRequests.addOffsetCommitRequest(requestState);
+result = request.future;
+result.whenComplete(autoCommitCallback(request.offsets));
 }
-
-autocommit.resetTimer();
-autocommit.setInflightCommitStatus(true);
-CompletableFuture result = addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch)
-.whenComplete(autoCommitCallback(offsets));
 return result;
 }
 
 /**
- * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
- * partitions and their current positions. Note on auto-commit timers: 
this will reset the
- * auto-commit timer to the interval before issuing the async commit, and 
when the async commit
- * completes, it will reset the auto-commit timer with the exponential 
backoff if the request
- * failed with a retriable error.
- *
- * @return Future that will complete when a response is received for the 
request, or a
- * completed future if no request is generated.
+ * If auto-commit is enabled, and the auto-commit interval has expired, 
this will generate and
+ * enqueue a request to commit all consumed offsets, and will reset the 
auto-commit timer to the
+ * interval. The request will be sent on the next call to {@link 
#poll(long)}.
+ * 
+ * If the request completes with a retriable error, this will reset the 
auto-commit timer with
+ * the exponential backoff. If it fails with a non-retriable error, no 
action is taken, so
+ * the next commit will be generated when the interval expires.
  */
-public CompletableFuture maybeAutoCommitAllConsumedAsync() {
-if (!autoCommitEnabled()) {
-// Early return to 

Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-20 Thread via GitHub


lianetm commented on code in PR #15357:
URL: https://github.com/apache/kafka/pull/15357#discussion_r1496091256


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -719,26 +846,27 @@ NetworkClientDelegate.UnsentRequest 
buildRequestWithResponseHandling(final Abstr
 
 private void handleClientResponse(final ClientResponse response,
   final Throwable error,
-  final long currentTimeMs) {
+  final long requestCompletionTimeMs) {
 try {
 if (error == null) {
 onResponse(response);
 } else {
 log.debug("{} completed with error", requestDescription(), 
error);
-handleCoordinatorDisconnect(error, currentTimeMs);
-if (error instanceof RetriableException) {
-maybeRetry(currentTimeMs, error);
-} else {
-future().completeExceptionally(error);
-}
+onFailedAttempt(requestCompletionTimeMs);
+handleCoordinatorDisconnect(error, 
requestCompletionTimeMs);
+future().completeExceptionally(error);
 }
 } catch (Throwable t) {
-log.error("Unexpected error handling response for ", 
requestDescription(), t);
+log.error("Unexpected error handling response for {}", 
requestDescription(), t);
 future().completeExceptionally(t);
 }
 }
 
 abstract void onResponse(final ClientResponse response);
+
+abstract boolean retryTimeoutExpired(long currentTimeMs);

Review Comment:
   Good point, done. I missed that probably influenced by the fact that, in the 
case of the commits, the expirationTimeout does not apply to all request. But I 
do agree with you that the expiration belongs with the retry logic so it does 
fit well at the RetriableRequest level (just Optional).  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-20 Thread via GitHub


lianetm commented on code in PR #15357:
URL: https://github.com/apache/kafka/pull/15357#discussion_r1496065426


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -204,126 +205,315 @@ private static long findMinTime(final Collection request
 }
 
 /**
- * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
- * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
- * request if there is no other commit request already in-flight, and if 
the commit interval
- * has elapsed.
+ * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+ * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+ * offsets to commit, no request will be generated and a completed future 
will be returned.
  *
- * @param offsets   Offsets to commit
- * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
- *  fails with a retriable error. If not present, 
the request will be
- *  sent but not retried.
- * @param checkInterval True if the auto-commit interval expiration 
should be checked for
- *  sending a request. If true, the request will 
be sent only if the
- *  auto-commit interval has expired. Pass false to
- *  send the auto-commit request regardless of the 
interval (ex.
- *  auto-commit before rebalance).
- * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
- *  {@link Errors#STALE_MEMBER_EPOCH}.
- * @return Future that will complete when a response is received for the 
request, or a
- * completed future if no request is generated.
+ * @param requestState Commit request
+ * @return Future containing the offsets that were committed, or an error 
if the request
+ * failed.
  */
-private CompletableFuture maybeAutoCommit(final Map offsets,
-final Optional 
expirationTimeMs,
-boolean checkInterval,
-boolean retryOnStaleEpoch) 
{
-if (!autoCommitEnabled()) {
-log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-return CompletableFuture.completedFuture(null);
-}
-
+private CompletableFuture> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
 AutoCommitState autocommit = autoCommitState.get();
-if (checkInterval && !autocommit.shouldAutoCommit()) {
-return CompletableFuture.completedFuture(null);
+CompletableFuture> result;
+if (requestState.offsets.isEmpty()) {
+result = CompletableFuture.completedFuture(Collections.emptyMap());
+} else {
+autocommit.setInflightCommitStatus(true);
+OffsetCommitRequestState request = 
pendingRequests.addOffsetCommitRequest(requestState);
+result = request.future;
+result.whenComplete(autoCommitCallback(request.offsets));
 }
-
-autocommit.resetTimer();
-autocommit.setInflightCommitStatus(true);
-CompletableFuture result = addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch)
-.whenComplete(autoCommitCallback(offsets));
 return result;
 }
 
 /**
- * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
- * partitions and their current positions. Note on auto-commit timers: 
this will reset the
- * auto-commit timer to the interval before issuing the async commit, and 
when the async commit
- * completes, it will reset the auto-commit timer with the exponential 
backoff if the request
- * failed with a retriable error.
- *
- * @return Future that will complete when a response is received for the 
request, or a
- * completed future if no request is generated.
+ * If auto-commit is enabled, and the auto-commit interval has expired, 
this will generate and
+ * enqueue a request to commit all consumed offsets, and will reset the 
auto-commit timer to the
+ * interval. The request will be sent on the next call to {@link 
#poll(long)}.
+ * 
+ * If the request completes with a retriable error, this will reset the 
auto-commit timer with
+ * the exponential backoff. If it fails with a non-retriable error, no 
action is taken, so
+ * the next commit will be generated when the interval expires.
  */
-public CompletableFuture maybeAutoCommitAllConsumedAsync() {
-if (!autoCommitEnabled()) {
-// Early return to 

Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-20 Thread via GitHub


lianetm commented on code in PR #15357:
URL: https://github.com/apache/kafka/pull/15357#discussion_r1496060942


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -204,126 +205,315 @@ private static long findMinTime(final Collection request
 }
 
 /**
- * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
- * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
- * request if there is no other commit request already in-flight, and if 
the commit interval
- * has elapsed.
+ * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+ * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+ * offsets to commit, no request will be generated and a completed future 
will be returned.
  *
- * @param offsets   Offsets to commit
- * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
- *  fails with a retriable error. If not present, 
the request will be
- *  sent but not retried.
- * @param checkInterval True if the auto-commit interval expiration 
should be checked for
- *  sending a request. If true, the request will 
be sent only if the
- *  auto-commit interval has expired. Pass false to
- *  send the auto-commit request regardless of the 
interval (ex.
- *  auto-commit before rebalance).
- * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
- *  {@link Errors#STALE_MEMBER_EPOCH}.
- * @return Future that will complete when a response is received for the 
request, or a
- * completed future if no request is generated.
+ * @param requestState Commit request
+ * @return Future containing the offsets that were committed, or an error 
if the request
+ * failed.
  */
-private CompletableFuture maybeAutoCommit(final Map offsets,
-final Optional 
expirationTimeMs,
-boolean checkInterval,
-boolean retryOnStaleEpoch) 
{
-if (!autoCommitEnabled()) {
-log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-return CompletableFuture.completedFuture(null);
-}
-
+private CompletableFuture> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
 AutoCommitState autocommit = autoCommitState.get();
-if (checkInterval && !autocommit.shouldAutoCommit()) {
-return CompletableFuture.completedFuture(null);
+CompletableFuture> result;
+if (requestState.offsets.isEmpty()) {
+result = CompletableFuture.completedFuture(Collections.emptyMap());
+} else {
+autocommit.setInflightCommitStatus(true);
+OffsetCommitRequestState request = 
pendingRequests.addOffsetCommitRequest(requestState);
+result = request.future;
+result.whenComplete(autoCommitCallback(request.offsets));
 }
-
-autocommit.resetTimer();
-autocommit.setInflightCommitStatus(true);
-CompletableFuture result = addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch)
-.whenComplete(autoCommitCallback(offsets));
 return result;
 }
 
 /**
- * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
- * partitions and their current positions. Note on auto-commit timers: 
this will reset the
- * auto-commit timer to the interval before issuing the async commit, and 
when the async commit
- * completes, it will reset the auto-commit timer with the exponential 
backoff if the request
- * failed with a retriable error.
- *
- * @return Future that will complete when a response is received for the 
request, or a
- * completed future if no request is generated.
+ * If auto-commit is enabled, and the auto-commit interval has expired, 
this will generate and
+ * enqueue a request to commit all consumed offsets, and will reset the 
auto-commit timer to the
+ * interval. The request will be sent on the next call to {@link 
#poll(long)}.
+ * 
+ * If the request completes with a retriable error, this will reset the 
auto-commit timer with
+ * the exponential backoff. If it fails with a non-retriable error, no 
action is taken, so
+ * the next commit will be generated when the interval expires.
  */
-public CompletableFuture maybeAutoCommitAllConsumedAsync() {
-if (!autoCommitEnabled()) {
-// Early return to 

Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-20 Thread via GitHub


lianetm commented on code in PR #15357:
URL: https://github.com/apache/kafka/pull/15357#discussion_r1496049319


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -149,8 +148,14 @@ private void process(final CommitApplicationEvent event) {
 }
 
 CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-Optional expirationTimeMs = 
event.retryTimeoutMs().map(this::getExpirationTimeForTimeout);
-event.chain(manager.addOffsetCommitRequest(event.offsets(), 
expirationTimeMs, false));
+CompletableFuture commitResult;
+if (event.retryTimeoutMs().isPresent()) {

Review Comment:
   Say no more, I was leaning towards that too (just a bit hesitant to 
introduce more on this already big PR), but totally agree that it's 
conceptually clearer, the result is nice, done.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1344,8 +1343,8 @@ public void commitSync(Map offsets, Duration
 long commitStart = time.nanoseconds();
 try {
 Timer requestTimer = time.timer(timeout.toMillis());
-// Commit with a timer to control how long the request should be 
retried until it
-// gets a successful response or non-retriable error.
+// Commit with a retry timeout (the commit request will be retried 
until it gets a
+// successful response, non-retriable error, or the timeout 
expires)
 CompletableFuture commitFuture = commit(offsets, true, 
Optional.of(timeout.toMillis()));

Review Comment:
   Makes sense, done. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest

2024-02-20 Thread Hector Geraldino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818856#comment-17818856
 ] 

Hector Geraldino commented on KAFKA-16223:
--

One thing that worked for me when migrating the WorkerSinkTaskTest was creating 
a separate *WorkerSinkTaskMockTest* test class and splitting the migration in 
smaller batches.

Is that something you'd consider?

> Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
> ---
>
> Key: KAFKA-16223
> URL: https://issues.apache.org/jira/browse/KAFKA-16223
> Project: Kafka
>  Issue Type: Sub-task
>  Components: connect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]

2024-02-20 Thread via GitHub


msn-tldr commented on PR #15385:
URL: https://github.com/apache/kafka/pull/15385#issuecomment-1954448982

   All the Jenkins test failures are in different tests, which are already 
known to have flaky history.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16194: Do not return records from poll if group metadata unknown [kafka]

2024-02-20 Thread via GitHub


cadonna commented on code in PR #15369:
URL: https://github.com/apache/kafka/pull/15369#discussion_r1495859702


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1467,6 +1483,7 @@ public void unsubscribe() {
 } catch (TimeoutException e) {
 log.error("Failed while waiting for the unsubscribe event 
to complete");
 }
+groupMetadata = 
initializeGroupMetadata(groupMetadata.get().groupId(), Optional.empty());

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16284: Fix performance regression in RocksDB [kafka]

2024-02-20 Thread via GitHub


lucasbru commented on PR #15393:
URL: https://github.com/apache/kafka/pull/15393#issuecomment-1954244163

   @cadonna could you take a look? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16194: Do not return records from poll if group metadata unknown [kafka]

2024-02-20 Thread via GitHub


lucasbru commented on code in PR #15369:
URL: https://github.com/apache/kafka/pull/15369#discussion_r1495811911


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -98,6 +99,8 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;

Review Comment:
   not used



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -727,6 +732,17 @@ public ConsumerRecords poll(final Duration timeout) {
 }
 }
 
+private boolean isGenerationKnown() {

Review Comment:
   How about this one-liner
   
   ```
   return groupMetadata.filter(g -> g.generationId() != 
JoinGroupRequest.UNKNOWN_GENERATION_ID).isPresent();
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -708,14 +709,18 @@ public ConsumerRecords poll(final Duration timeout) 
{
 wakeupTrigger.maybeTriggerWakeup();
 
 updateAssignmentMetadataIfNeeded(timer);
-final Fetch fetch = pollForFetches(timer);
-if (!fetch.isEmpty()) {
-if (fetch.records().isEmpty()) {
-log.trace("Returning empty records from `poll()` "
+if (isGenerationKnown()) {

Review Comment:
   If this check is false... where do we block? It seems like we busy loop with 
100% CPU, but maybe I'm wrong



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1467,6 +1483,7 @@ public void unsubscribe() {
 } catch (TimeoutException e) {
 log.error("Failed while waiting for the unsubscribe event 
to complete");
 }
+groupMetadata = 
initializeGroupMetadata(groupMetadata.get().groupId(), Optional.empty());

Review Comment:
   This looks like an independent fix? Maybe mention in the PR description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16284: Fix performance regression in RocksDB [kafka]

2024-02-20 Thread via GitHub


lucasbru opened a new pull request, #15393:
URL: https://github.com/apache/kafka/pull/15393

   A performance regression introduced in commit 
5bc3aa428067dff1f2b9075ff5d1351fb05d4b10 reduces the write performance in 
RocksDB by ~3x. The bug is that we fail to pass the `WriteOptions` that disable 
the write-ahead log into the DB accessor.
   
   For testing, the time to write 10 times 1 Million records into one RocksDB 
each were measured:
   
* Before 5bc3aa428067dff1f2b9075ff5d1351fb05d4b10: 7954ms, 12933ms
* After 5bc3aa428067dff1f2b9075ff5d1351fb05d4b10: 30345ms, 31992ms
* After 5bc3aa428067dff1f2b9075ff5d1351fb05d4b10 with this fix: 8040ms, 
10563ms
* On current trunk with this fix:  9508ms, 10441ms

   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16284) Performance regression in RocksDB

2024-02-20 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16284:
--

Assignee: Lucas Brutschy

> Performance regression in RocksDB
> -
>
> Key: KAFKA-16284
> URL: https://issues.apache.org/jira/browse/KAFKA-16284
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
>
> In benchmarks, we are noticing a performance regression in the performance of 
> `RocksDBStore`.
> The regression happens between those two commits:
>  
> {code:java}
> trunk - 70c8b8d0af - regressed - 2024-01-06T14:00:20Z
> trunk - d5aa341a18 - not regressed - 2023-12-31T11:47:14Z
> {code}
> The regression can be reproduced by the following test:
>  
> {code:java}
> package org.apache.kafka.streams.state.internals;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.common.utils.Bytes;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.processor.StateStoreContext;
> import org.apache.kafka.test.InternalMockProcessorContext;
> import org.apache.kafka.test.MockRocksDbConfigSetter;
> import org.apache.kafka.test.StreamsTestUtils;
> import org.apache.kafka.test.TestUtils;
> import org.junit.Before;
> import org.junit.Test;
> import java.io.File;
> import java.nio.ByteBuffer;
> import java.util.Properties;
> public class RocksDBStorePerfTest {
> InternalMockProcessorContext context;
> RocksDBStore rocksDBStore;
> final static String DB_NAME = "db-name";
> final static String METRICS_SCOPE = "metrics-scope";
> RocksDBStore getRocksDBStore() {
> return new RocksDBStore(DB_NAME, METRICS_SCOPE);
> }
> @Before
> public void setUp() {
> final Properties props = StreamsTestUtils.getStreamsConfig();
> props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, 
> MockRocksDbConfigSetter.class);
> File dir = TestUtils.tempDirectory();
> context = new InternalMockProcessorContext<>(
> dir,
> Serdes.String(),
> Serdes.String(),
> new StreamsConfig(props)
> );
> }
> @Test
> public void testPerf() {
> long start = System.currentTimeMillis();
> for (int i = 0; i < 10; i++) {
> System.out.println("Iteration: "+i+" Time: " + 
> (System.currentTimeMillis() - start));
> RocksDBStore rocksDBStore = getRocksDBStore();
> rocksDBStore.init((StateStoreContext) context, rocksDBStore);
> for (int j = 0; j < 100; j++) {
> rocksDBStore.put(new 
> Bytes(ByteBuffer.allocate(4).putInt(j).array()), "perf".getBytes());
> }
> rocksDBStore.close();
> }
> long end = System.currentTimeMillis();
> System.out.println("Time: " + (end - start));
> }
> }
>  {code}
>  
> I have isolated the regression to commit 
> [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10].
>  On my machine, the test takes ~8 seconds before 
> [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]
>  and ~30 seconds after 
> [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-12744: Breaking change dependency upgrade: "argparse4j" 0.7.0 -->> 0.9.0 [kafka]

2024-02-20 Thread via GitHub


dejan2609 commented on PR #10626:
URL: https://github.com/apache/kafka/pull/10626#issuecomment-1954154725

   Hi @mimaison ! I will return to this once Kafka drops support for Java 8.
   
   Reference JIRA ticket:  
   **_KIP-750: Drop support for Java 8 in Kafka 4.0 (deprecate in 3.0)_** 
https://issues.apache.org/jira/browse/KAFKA-12894 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16285) Make group metadata available when a new assignment is set in async Kafka consumer

2024-02-20 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-16285:
--
Description: 
Currently, the new async Kafka consumer sends an event from the background 
thread to the application thread when the group metadata is updated. Group 
metadata is updated when the background thread receives a new assignment. More 
specifically, the member epoch is updated each time a new assignment is 
received and and the member ID is updated with the first assignment. 
In contrast to the group metadata update, the assignment is directly set in the 
subscription without sending an update event from the background thread to the 
application thread. That means that there is a delay between the application 
thread being aware of the update to the assignment and the application thread 
being aware of the update to the group metadata. This behavior differs with 
respect to the legacy consumer were the assignment and the group metadata is 
updated at the same time.
We should make the update to the group metadata available to the application 
thread when the update to the assignment is made available to the application 
thread so that assignment an group metadata are in sync.

For example, {{producer.sendOffsetsToTransaction(offsetsToCommit, 
groupMetadata);}} benefits from this improvement because if the offsets to 
commit are consistent with the current assignment also the group metadata would 
be. Currently, that is not guaranteed. 

  was:
Currently, the new async Kafka consumer sends an event from the background 
thread to the application thread when the group metadata is updated. Group 
metadata is updated when the background thread receives a new assignment. More 
specifically, the member epoch is updated each time a new assignment is 
received and and the member ID is updated with the first assignment. 
In contrast to the group metadata update, the assignment is directly set in the 
subscription without sending an update event from the background thread to the 
application thread. That means that there is a delay between the application 
thread being aware of the update to the assignment and the application thread 
being aware of the update to the group metadata. This behavior differs with 
respect to the legacy consumer were the assignment and the group metadata is 
updated at the same time.
We should make the update to the group metadata available to the application 
thread when the update to the assignment is made available to the application 
thread so that assignment an group metadata are in sync. 


> Make group metadata available when a new assignment is set in async Kafka 
> consumer
> --
>
> Key: KAFKA-16285
> URL: https://issues.apache.org/jira/browse/KAFKA-16285
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: kip-848-client-support
>
> Currently, the new async Kafka consumer sends an event from the background 
> thread to the application thread when the group metadata is updated. Group 
> metadata is updated when the background thread receives a new assignment. 
> More specifically, the member epoch is updated each time a new assignment is 
> received and and the member ID is updated with the first assignment. 
> In contrast to the group metadata update, the assignment is directly set in 
> the subscription without sending an update event from the background thread 
> to the application thread. That means that there is a delay between the 
> application thread being aware of the update to the assignment and the 
> application thread being aware of the update to the group metadata. This 
> behavior differs with respect to the legacy consumer were the assignment and 
> the group metadata is updated at the same time.
> We should make the update to the group metadata available to the application 
> thread when the update to the assignment is made available to the application 
> thread so that assignment an group metadata are in sync.
> For example, {{producer.sendOffsetsToTransaction(offsetsToCommit, 
> groupMetadata);}} benefits from this improvement because if the offsets to 
> commit are consistent with the current assignment also the group metadata 
> would be. Currently, that is not guaranteed. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16285) Make group metadata available when a new assignment is set in async Kafka consumer

2024-02-20 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-16285:
-

 Summary: Make group metadata available when a new assignment is 
set in async Kafka consumer
 Key: KAFKA-16285
 URL: https://issues.apache.org/jira/browse/KAFKA-16285
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Reporter: Bruno Cadonna


Currently, the new async Kafka consumer sends an event from the background 
thread to the application thread when the group metadata is updated. Group 
metadata is updated when the background thread receives a new assignment. More 
specifically, the member epoch is updated each time a new assignment is 
received and and the member ID is updated with the first assignment. 
In contrast to the group metadata update, the assignment is directly set in the 
subscription without sending an update event from the background thread to the 
application thread. That means that there is a delay between the application 
thread being aware of the update to the assignment and the application thread 
being aware of the update to the group metadata. This behavior differs with 
respect to the legacy consumer were the assignment and the group metadata is 
updated at the same time.
We should make the update to the group metadata available to the application 
thread when the update to the assignment is made available to the application 
thread so that assignment an group metadata are in sync. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16283) RoundRobinPartitioner will only send to half of the partitions in a topic

2024-02-20 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818794#comment-17818794
 ] 

Luke Chen commented on KAFKA-16283:
---

[~alivshits], any thoughts on this?

> RoundRobinPartitioner will only send to half of the partitions in a topic
> -
>
> Key: KAFKA-16283
> URL: https://issues.apache.org/jira/browse/KAFKA-16283
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Priority: Major
>
> When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we 
> expect data are send to all partitions in round-robin manner. But we found 
> there are only half of the partitions got the data. This causes half of the 
> resources(storage, consumer...) are wasted.
> {code:java}
> > bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server 
> > localhost:9092 --partitions 2 
> Created topic quickstart-events4.
> # send 10 records to the topic, expecting 5 records in partition0, and 5 
> records in partition1
> > bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 
> > --record-size 100 --throughput -1 --producer-props 
> > bootstrap.servers=localhost:9092 
> > partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
> 10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, 
> 132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th.
> > ls -al /tmp/kafka-logs/quickstart-events4-0
> total 24
> drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
> drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
> -rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
> -rw-r--r--   1 lukchen  wheel  1151  2 20 19:53 .log
> -rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
> .timeindex
> -rw-r--r--   1 lukchen  wheel 8  2 20 19:53 leader-epoch-checkpoint
> -rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata
> # No records in partition 1
> > ls -al /tmp/kafka-logs/quickstart-events4-1
> total 8
> drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
> drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
> -rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
> -rw-r--r--   1 lukchen  wheel 0  2 20 19:53 .log
> -rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
> .timeindex
> -rw-r--r--   1 lukchen  wheel 0  2 20 19:53 leader-epoch-checkpoint
> -rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata
> {code}
> Had a quick look, it's because we will abortOnNewBatch each time when new 
> batch created. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16284) Performance regression in RocksDB

2024-02-20 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-16284:
--

 Summary: Performance regression in RocksDB
 Key: KAFKA-16284
 URL: https://issues.apache.org/jira/browse/KAFKA-16284
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Lucas Brutschy


In benchmarks, we are noticing a performance regression in the performance of 
`RocksDBStore`.

The regression happens between those two commits:
 
{code:java}
trunk - 70c8b8d0af - regressed - 2024-01-06T14:00:20Z
trunk - d5aa341a18 - not regressed - 2023-12-31T11:47:14Z
{code}

The regression can be reproduced by the following test:
 
{code:java}
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRocksDbConfigSetter;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.Properties;

public class RocksDBStorePerfTest {

InternalMockProcessorContext context;
RocksDBStore rocksDBStore;
final static String DB_NAME = "db-name";
final static String METRICS_SCOPE = "metrics-scope";

RocksDBStore getRocksDBStore() {
return new RocksDBStore(DB_NAME, METRICS_SCOPE);
}
@Before
public void setUp() {
final Properties props = StreamsTestUtils.getStreamsConfig();
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, 
MockRocksDbConfigSetter.class);
File dir = TestUtils.tempDirectory();
context = new InternalMockProcessorContext<>(
dir,
Serdes.String(),
Serdes.String(),
new StreamsConfig(props)
);
}

@Test
public void testPerf() {
long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
System.out.println("Iteration: "+i+" Time: " + 
(System.currentTimeMillis() - start));
RocksDBStore rocksDBStore = getRocksDBStore();
rocksDBStore.init((StateStoreContext) context, rocksDBStore);
for (int j = 0; j < 100; j++) {
rocksDBStore.put(new 
Bytes(ByteBuffer.allocate(4).putInt(j).array()), "perf".getBytes());
}
rocksDBStore.close();
}
long end = System.currentTimeMillis();
System.out.println("Time: " + (end - start));
}

}
 {code}
 
I have isolated the regression to commit 
[5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10].
 On my machine, the test takes ~8 seconds before 
[5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]
 and ~30 seconds after 
[5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10].




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [WIP] KAFKA-13566: producer exponential backoff implementation for KIP-580 [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #11646:
URL: https://github.com/apache/kafka/pull/11646#issuecomment-1954090264

   Done in https://github.com/apache/kafka/pull/14111, closing this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] KAFKA-13566: producer exponential backoff implementation for KIP-580 [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #11646: [WIP] KAFKA-13566: producer exponential 
backoff implementation for KIP-580
URL: https://github.com/apache/kafka/pull/11646


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] KAFKA-13567: adminClient exponential backoff implementation for KIP-580 [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #11647: [WIP] KAFKA-13567: adminClient exponential 
backoff implementation for KIP-580
URL: https://github.com/apache/kafka/pull/11647


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] KAFKA-13567: adminClient exponential backoff implementation for KIP-580 [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #11647:
URL: https://github.com/apache/kafka/pull/11647#issuecomment-1954090079

   Done in https://github.com/apache/kafka/pull/14111, closing this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add kafka topic sync offset lag metrics by JMX [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #11387:
URL: https://github.com/apache/kafka/pull/11387#issuecomment-1954086721

   This is being done by KIP-971, so closing this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add kafka topic sync offset lag metrics by JMX [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #11387: Add kafka topic sync offset lag metrics by 
JMX
URL: https://github.com/apache/kafka/pull/11387


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-02-20 Thread via GitHub


cadonna commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1495579840


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1730,6 +1744,21 @@ private void subscribeInternal(Pattern pattern, 
Optional listener) {
+acquireAndEnsureOpen();
+try {
+maybeThrowInvalidGroupIdException();
+if (pattern == null || pattern.pattern().isEmpty())

Review Comment:
   I think we should move the check if the pattern is non-empty (and maybe some 
other validity checks) to class `SubscriptionPattern`. It should not be 
possible to create an instance of `SubscriptionPattern` with an empty pattern.



##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -754,6 +754,51 @@ public void subscribe(Pattern pattern) {
 delegate.subscribe(pattern);
 }
 
+/**
+ * Subscribe to all topics matching specified pattern to get dynamically 
assigned partitions.
+ * The pattern matching will be done periodically against all topics 
existing at the time of check.
+ * This can be controlled through the {@code metadata.max.age.ms} 
configuration: by lowering
+ * the max metadata age, the consumer will refresh metadata more often and 
check for matching topics.

Review Comment:
   I think this is not correct with the new pattern subscription since the 
assignment is done broker-side.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -84,6 +85,9 @@ private enum SubscriptionType {
 /* the pattern user has requested */
 private Pattern subscribedPattern;
 
+/* RE2J compatible regex */
+private SubscriptionPattern subscriptionPattern;

Review Comment:
   What about having a class that contains either a `SubscriptionPattern` or a 
`Pattern`?
   Something like :
   
   ```java
   public class JavaPatternOrSubscriptionPattern {
   
   private final Pattern javaPattern;
   
   private final SubscriptionPattern subscriptionPattern;
   
   private JavaPatternOrSubscriptionPattern(final Pattern pattern) {
   this.pattern = pattern;
   } 
   
   private JavaPatternOrSubscriptionPattern(final SubscriptionPattern 
pattern) {
   this.subscriptionPattern = pattern;
   } 
   
   public static JavaPatternOrSubscriptionPattern javaPattern(final Pattern 
pattern) {
   return new JavaPatternOrSubscriptionPattern(pattern);
   }
   
   public static JavaPatternOrSubscriptionPattern subscriptionPattern(final 
SubscriptionPattern pattern) {
   return new JavaPatternOrSubscriptionPattern(pattern);
   }
   
   public String pattern() {
   return subscriptionPattern != null ? subscriptionPattern.pattern() : 
javaPattern.pattern();
   }
   
   public String toString() {
  return "pattern = " + pattern();
   } 
   
  ...
   } 
   ```
   
   In such a way we would encapsulate the code that ensures that there is only 
of both set.  



##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -754,6 +754,51 @@ public void subscribe(Pattern pattern) {
 delegate.subscribe(pattern);
 }
 
+/**
+ * Subscribe to all topics matching specified pattern to get dynamically 
assigned partitions.
+ * The pattern matching will be done periodically against all topics 
existing at the time of check.
+ * This can be controlled through the {@code metadata.max.age.ms} 
configuration: by lowering
+ * the max metadata age, the consumer will refresh metadata more often and 
check for matching topics.
+ * 
+ * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for 
details on the
+ * use of the {@link ConsumerRebalanceListener}. Generally rebalances are 
triggered when there
+ * is a change to the topics matching the provided pattern and when 
consumer group membership changes.
+ * Group rebalances only take place during an active call to {@link 
#poll(Duration)}.

Review Comment:
   This should also not correct anymore with KP-848 which introduces this 
method.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -495,6 +496,16 @@ public void subscribe(Pattern pattern) {
 subscribeInternal(pattern, Optional.empty());
 }
 
+@Override
+public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener callback) {
+log.warn("Operation not supported in new consumer group protocol");
+}
+
+@Override
+public void subscribe(SubscriptionPattern pattern) {
+log.warn("Operation not supported in new consumer group protocol");

Review Comment:
   See above



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java:

Re: [PR] KAFKA-12891: KIP-749 Add --files and --file-separator options to the ConsoleProducer (WIP) [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #10889: KAFKA-12891: KIP-749 Add --files and 
--file-separator options to the ConsoleProducer (WIP)
URL: https://github.com/apache/kafka/pull/10889


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12891: KIP-749 Add --files and --file-separator options to the ConsoleProducer (WIP) [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #10889:
URL: https://github.com/apache/kafka/pull/10889#issuecomment-1954080126

   KIP-749 was never voted and is now abandoned so I'll close this PR. Feel 
free to restart the discussion on the KIP if it's still something you want to 
do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16283) RoundRobinPartitioner will only send to half of the partitions in a topic

2024-02-20 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16283:
--
Description: 
When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we expect 
data are send to all partitions in round-robin manner. But we found there are 
only half of the partitions got the data. This causes half of the 
resources(storage, consumer...) are wasted.


{code:java}
> bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server 
> localhost:9092 --partitions 2 

Created topic quickstart-events4.

# send 10 records to the topic, expecting 5 records in partition0, and 5 
records in partition1
> bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 
> --record-size 100 --throughput -1 --producer-props 
> bootstrap.servers=localhost:9092 
> partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner

10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, 
132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th.

> ls -al /tmp/kafka-logs/quickstart-events4-0
total 24
drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
-rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
-rw-r--r--   1 lukchen  wheel  1151  2 20 19:53 .log
-rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
.timeindex
-rw-r--r--   1 lukchen  wheel 8  2 20 19:53 leader-epoch-checkpoint
-rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata

# No records in partition 1
> ls -al /tmp/kafka-logs/quickstart-events4-1
total 8
drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
-rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
-rw-r--r--   1 lukchen  wheel 0  2 20 19:53 .log
-rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
.timeindex
-rw-r--r--   1 lukchen  wheel 0  2 20 19:53 leader-epoch-checkpoint
-rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata
{code}

Had a quick look, it's because we will abortOnNewBatch each time when new batch 
created. 

  was:
When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we expect 
data are send to all partitions in round-robin manner. But we found there are 
only half of the partitions got the data. This causes half of the 
resources(storage, consumer...) are wasted.


{code:java}
> bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server 
> localhost:9092 --partitions 2 

Created topic quickstart-events4.

# send 10 records to the topic, expecting 5 records in partition0, and 5 
records in partition1
> bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 
> --record-size 100 --throughput -1 --producer-props 
> bootstrap.servers=localhost:9092 
> partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner

10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, 
132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th.

> ls -al /tmp/kafka-logs/quickstart-events4-0
total 24
drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
-rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
-rw-r--r--   1 lukchen  wheel  1151  2 20 19:53 .log
-rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
.timeindex
-rw-r--r--   1 lukchen  wheel 8  2 20 19:53 leader-epoch-checkpoint
-rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata

# No records in partition 1
> ls -al /tmp/kafka-logs/quickstart-events4-1
total 8
drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
-rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
-rw-r--r--   1 lukchen  wheel 0  2 20 19:53 .log
-rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
.timeindex
-rw-r--r--   1 lukchen  wheel 0  2 20 19:53 leader-epoch-checkpoint
-rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata
{code}



> RoundRobinPartitioner will only send to half of the partitions in a topic
> -
>
> Key: KAFKA-16283
> URL: https://issues.apache.org/jira/browse/KAFKA-16283
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Priority: Major
>
> When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we 
> expect data are send to all partitions in round-robin manner. But we found 
> there are only 

Re: [PR] KAFKA-12744: Breaking change dependency upgrade: "argparse4j" 0.7.0 -->> 0.9.0 [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #10626:
URL: https://github.com/apache/kafka/pull/10626#issuecomment-1954070991

   We can't upgrade to argparse 0.9.0 now as it requires Java 9. Closing this 
PR for now, we can revisit it once Kafka drops support for Java 8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16283) RoundRobinPartitioner will only send to half of the partitions in a topic

2024-02-20 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16283:
--
Description: 
When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we expect 
data are send to all partitions in round-robin manner. But we found there are 
only half of the partitions got the data. This causes half of the 
resources(storage, consumer...) are wasted.


{code:java}
> bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server 
> localhost:9092 --partitions 2 

Created topic quickstart-events4.

# send 10 records to the topic, expecting 5 records in partition0, and 5 
records in partition1
> bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 
> --record-size 100 --throughput -1 --producer-props 
> bootstrap.servers=localhost:9092 
> partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner

10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, 
132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th.

> ls -al /tmp/kafka-logs/quickstart-events4-0
total 24
drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
-rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
-rw-r--r--   1 lukchen  wheel  1151  2 20 19:53 .log
-rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
.timeindex
-rw-r--r--   1 lukchen  wheel 8  2 20 19:53 leader-epoch-checkpoint
-rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata

# No records in partition 1
> ls -al /tmp/kafka-logs/quickstart-events4-1
total 8
drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
-rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
-rw-r--r--   1 lukchen  wheel 0  2 20 19:53 .log
-rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
.timeindex
-rw-r--r--   1 lukchen  wheel 0  2 20 19:53 leader-epoch-checkpoint
-rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata
{code}


  was:
When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we expect 
data are send to all partitions in round-robin manner. But we found there are 
only half of the partitions got the data. This causes half of the 
resources(storage, consumer...) are wasted.


{code:java}
> bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server 
> localhost:9092 --partitions 2 

Created topic quickstart-events4.

> bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 
> --record-size 100 --throughput -1 --producer-props 
> bootstrap.servers=localhost:9092 
> partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner

10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, 
132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th.

> ls -al /tmp/kafka-logs/quickstart-events4-0
total 24
drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
-rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
-rw-r--r--   1 lukchen  wheel  1151  2 20 19:53 .log
-rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
.timeindex
-rw-r--r--   1 lukchen  wheel 8  2 20 19:53 leader-epoch-checkpoint
-rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata

> ls -al /tmp/kafka-logs/quickstart-events4-1
total 8
drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
-rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
-rw-r--r--   1 lukchen  wheel 0  2 20 19:53 .log
-rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
.timeindex
-rw-r--r--   1 lukchen  wheel 0  2 20 19:53 leader-epoch-checkpoint
-rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata
{code}



> RoundRobinPartitioner will only send to half of the partitions in a topic
> -
>
> Key: KAFKA-16283
> URL: https://issues.apache.org/jira/browse/KAFKA-16283
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Priority: Major
>
> When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we 
> expect data are send to all partitions in round-robin manner. But we found 
> there are only half of the partitions got the data. This causes half of the 
> resources(storage, consumer...) are wasted.
> {code:java}
> > bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server 
> > 

Re: [PR] KAFKA-12744: Breaking change dependency upgrade: "argparse4j" 0.7.0 -->> 0.9.0 [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #10626: KAFKA-12744: Breaking change dependency 
upgrade: "argparse4j" 0.7.0 -->> 0.9.0
URL: https://github.com/apache/kafka/pull/10626


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16283) RoundRobinPartitioner will only send to half of the partitions in a topic

2024-02-20 Thread Luke Chen (Jira)
Luke Chen created KAFKA-16283:
-

 Summary: RoundRobinPartitioner will only send to half of the 
partitions in a topic
 Key: KAFKA-16283
 URL: https://issues.apache.org/jira/browse/KAFKA-16283
 Project: Kafka
  Issue Type: Bug
Reporter: Luke Chen


When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we expect 
data are send to all partitions in round-robin manner. But we found there are 
only half of the partitions got the data. This causes half of the 
resources(storage, consumer...) are wasted.


{code:java}
bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server 
localhost:9092 --partitions 2 

Created topic quickstart-events4.

bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 
--record-size 100 --throughput -1 --producer-props 
bootstrap.servers=localhost:9092 
partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner

10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, 
132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th.

lukchen@lukchen-mac kafka % ls -al /tmp/kafka-logs/quickstart-events4-0
total 24
drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
-rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
-rw-r--r--   1 lukchen  wheel  1151  2 20 19:53 .log
-rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
.timeindex
-rw-r--r--   1 lukchen  wheel 8  2 20 19:53 leader-epoch-checkpoint
-rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata
lukchen@lukchen-mac kafka % ls -al /tmp/kafka-logs/quickstart-events4-1
total 8
drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
-rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
-rw-r--r--   1 lukchen  wheel 0  2 20 19:53 .log
-rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
.timeindex
-rw-r--r--   1 lukchen  wheel 0  2 20 19:53 leader-epoch-checkpoint
-rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16283) RoundRobinPartitioner will only send to half of the partitions in a topic

2024-02-20 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16283:
--
Description: 
When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we expect 
data are send to all partitions in round-robin manner. But we found there are 
only half of the partitions got the data. This causes half of the 
resources(storage, consumer...) are wasted.


{code:java}
> bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server 
> localhost:9092 --partitions 2 

Created topic quickstart-events4.

> bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 
> --record-size 100 --throughput -1 --producer-props 
> bootstrap.servers=localhost:9092 
> partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner

10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, 
132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th.

> ls -al /tmp/kafka-logs/quickstart-events4-0
total 24
drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
-rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
-rw-r--r--   1 lukchen  wheel  1151  2 20 19:53 .log
-rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
.timeindex
-rw-r--r--   1 lukchen  wheel 8  2 20 19:53 leader-epoch-checkpoint
-rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata

> ls -al /tmp/kafka-logs/quickstart-events4-1
total 8
drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
-rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
-rw-r--r--   1 lukchen  wheel 0  2 20 19:53 .log
-rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
.timeindex
-rw-r--r--   1 lukchen  wheel 0  2 20 19:53 leader-epoch-checkpoint
-rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata
{code}


  was:
When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we expect 
data are send to all partitions in round-robin manner. But we found there are 
only half of the partitions got the data. This causes half of the 
resources(storage, consumer...) are wasted.


{code:java}
bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server 
localhost:9092 --partitions 2 

Created topic quickstart-events4.

bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 
--record-size 100 --throughput -1 --producer-props 
bootstrap.servers=localhost:9092 
partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner

10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, 
132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th.

lukchen@lukchen-mac kafka % ls -al /tmp/kafka-logs/quickstart-events4-0
total 24
drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
-rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
-rw-r--r--   1 lukchen  wheel  1151  2 20 19:53 .log
-rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
.timeindex
-rw-r--r--   1 lukchen  wheel 8  2 20 19:53 leader-epoch-checkpoint
-rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata
lukchen@lukchen-mac kafka % ls -al /tmp/kafka-logs/quickstart-events4-1
total 8
drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
-rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
-rw-r--r--   1 lukchen  wheel 0  2 20 19:53 .log
-rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
.timeindex
-rw-r--r--   1 lukchen  wheel 0  2 20 19:53 leader-epoch-checkpoint
-rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata
{code}



> RoundRobinPartitioner will only send to half of the partitions in a topic
> -
>
> Key: KAFKA-16283
> URL: https://issues.apache.org/jira/browse/KAFKA-16283
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Priority: Major
>
> When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we 
> expect data are send to all partitions in round-robin manner. But we found 
> there are only half of the partitions got the data. This causes half of the 
> resources(storage, consumer...) are wasted.
> {code:java}
> > bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server 
> > localhost:9092 --partitions 2 
> Created topic quickstart-events4.
> > 

Re: [PR] [KAFKA-10718][Kafka Connect]add config settting, skip record when enc… [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #9592:
URL: https://github.com/apache/kafka/pull/9592#issuecomment-1954057264

   This proposed a public API change so this requires a KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
   
   Considering the age of this PR, I'll close it for now. Feel free to open a 
KIP if you still want this feature.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [KAFKA-10718][Kafka Connect]add config settting, skip record when enc… [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #9592: [KAFKA-10718][Kafka Connect]add config 
settting, skip record when enc…
URL: https://github.com/apache/kafka/pull/9592


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10325: KIP-649 implementation [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-1954051810

   KIP-649 was never voted and is now abandoned so I'll close this PR. Feel 
free to restart the discussion on the KIP if it's still something you want to 
do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10325: KIP-649 implementation [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #9101: KAFKA-10325: KIP-649 implementation
URL: https://github.com/apache/kafka/pull/9101


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10325: KIP-649 implementation [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-1954051581

   KIP-649 was never voted and is now abandoned so I'll close this PR. Feel 
free to restart the discussion on the KIP if it's still something you want to 
do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10369 [WIP] KIP-655 implementation [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #9210:
URL: https://github.com/apache/kafka/pull/9210#issuecomment-1954049633

   KIP-655 was never voted and is now abandoned so I'll close this PR. Feel 
free to restart the discussion on the KIP if it's still something you want to 
do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10369 [WIP] KIP-655 implementation [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #9210: KAFKA-10369 [WIP] KIP-655 implementation
URL: https://github.com/apache/kafka/pull/9210


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10281: [WIP] Add log compression analysis tool KIP-640 [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #9193:
URL: https://github.com/apache/kafka/pull/9193#issuecomment-1954047088

   Considering KIP-640 was never voted and is now abandoned, I'll close this 
PR. Feel free to restart the discussion on the KIP if it's still something you 
want to do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10281: [WIP] Add log compression analysis tool KIP-640 [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #9193: KAFKA-10281: [WIP] Add log compression 
analysis tool KIP-640
URL: https://github.com/apache/kafka/pull/9193


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10116: GraalVM native-image prototype [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #8830:
URL: https://github.com/apache/kafka/pull/8830#issuecomment-1954044169

   This is now being done via KIP-974. Considering the age of this PR, I'll 
close it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10116: GraalVM native-image prototype [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #8830: KAFKA-10116: GraalVM native-image prototype
URL: https://github.com/apache/kafka/pull/8830


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16281) Possible IllegalState with KIP-996

2024-02-20 Thread Jack Vanlightly (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jack Vanlightly updated KAFKA-16281:

Description: 
I have a TLA+ model of KIP-996 (pre-vote) and I have identified an IllegalState 
exception that would occur with the existing MaybeHandleCommonResponse behavior.

The issue stems from the fact that a leader, let's call it r1, can resign 
(either due to a restart or check quorum) and then later initiate a pre-vote 
where it ends up in the same epoch as before. When r1 receives a response from 
r2 who believes that r1 is still the leader, the logic in 
MaybeHandleCommonResponse tries to transition r1 to follower of itself, causing 
an IllegalState exception to be raised.

This is an example history:
 # r1 is the leader in epoch 1.
 # r1 quorum resigns, or restarts and resigns.
 # r1 experiences an election timeout and transitions to Prospective.
 # r1 sends a pre vote request to its peers.
 # r2 thinks r1 is still the leader, sends a vote response, not granting its 
vote and setting leaderId=r1 and epoch=1.
 # r1 receives the vote response and executes MaybeHandleCommonResponse which 
tries to transition r1 to Follower of itself and an illegal state occurs.

The relevant else if statement in MaybeHandleCommonResponse is here: 
[https://github.com/apache/kafka/blob/a26a1d847f1884a519561e7a4fb4cd13e051c824/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1538]

In the TLA+ specification, I fixed this issue by adding a fourth condition to 
this statement, that replica must not be in the Prospective state. 
[https://github.com/Vanlightly/kafka-tlaplus/blob/9b2600d1cd5c65930d666b12792d47362b64c015/kraft/kip_996/kraft_kip_996_functions.tla#L336|https://github.com/Vanlightly/kafka-tlaplus/blob/421f170ba4bd8c5eceb36b88b47901ee3d9c3d2a/kraft/kip_996/kraft_kip_996_functions.tla#L336]

 

Note, that I also had to implement the sending of the BeginQuorumEpoch request 
by the leader to prevent a replica getting stuck in Prospective. If the replica 
r2 has an election timeout but due to a transient connectivity issue with the 
leader, but has also fallen behind slightly, then r2 will remain stuck as a 
Prospective because none of its peers, who have connectivity to the leader, 
will grant it a pre-vote. To enable r2 to become a functional member again, the 
leader must give it a nudge with a BeginQuorumEpoch request. The alternative 
(which I have also modeled) is for a Prospective to transition to Follower when 
it receives a negative pre-vote response with a non-null leaderId. This comes 
with a separate liveness issue which I can discuss if this "transition to 
Follower" approach is interesting. Either way, a stuck Prospective needs a way 
to transition to follower eventually, if all other members have a stable leader.

 

  was:
I have a TLA+ model of KIP-966 and I have identified an IllegalState exception 
that would occur with the existing MaybeHandleCommonResponse behavior.

The issue stems from the fact that a leader, let's call it r1, can resign 
(either due to a restart or check quorum) and then later initiate a pre-vote 
where it ends up in the same epoch as before, but a cleared local leader id. 
When r1 transitions to Prospective it clears its local leader id. When r1 
receives a response from r2 who believes that r1 is still the leader, the logic 
in MaybeHandleCommonResponse tries to transition r1 to follower of itself, 
causing an IllegalState exception to be raised.

This is an example history:
 # r1 is the leader in epoch 1.
 # r1 quorum resigns, or restarts and resigns.
 # r1 experiences an election timeout and transitions to Prospective clearing 
its local leader id.
 # r1 sends a pre vote request to its peers.
 # r2 thinks r1 is still the leader, sends a vote response, not granting its 
vote and setting leaderId=r1 and epoch=1.
 # r1 receives the vote response and executes MaybeHandleCommonResponse which 
tries to transition r1 to Follower of itself and an illegal state occurs.

The relevant else if statement in MaybeHandleCommonResponse is here: 
https://github.com/apache/kafka/blob/a26a1d847f1884a519561e7a4fb4cd13e051c824/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1538

In the TLA+ specification, I fixed this issue by adding a fourth condition to 
this statement, that the leaderId also does not equal this server's id. 
[https://github.com/Vanlightly/kafka-tlaplus/blob/9b2600d1cd5c65930d666b12792d47362b64c015/kraft/kip_996/kraft_kip_996_functions.tla#L336]

We should probably create a test to confirm the issue first and then look at 
using the fix I made in the TLA+, though there may be other options.


> Possible IllegalState with KIP-996
> --
>
> Key: KAFKA-16281
> URL: https://issues.apache.org/jira/browse/KAFKA-16281
> Project: Kafka
>  Issue Type: Task
>

Re: [PR] KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #8846: KAFKA-9800: [KIP-580] Client Exponential 
Backoff Implementation
URL: https://github.com/apache/kafka/pull/8846


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-1954041148

   Done in https://github.com/apache/kafka/pull/14111, closing this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] java 8 code level update [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #6454: java 8 code level update
URL: https://github.com/apache/kafka/pull/6454


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] java 8 code level update [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #6454:
URL: https://github.com/apache/kafka/pull/6454#issuecomment-1954032896

   It looks like most of these changes have been since the PR was originally 
opened. Considering its age I'm going to close it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-228 Negative record timestamp support [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #5072:
URL: https://github.com/apache/kafka/pull/5072#issuecomment-1954026617

   The KIP was never voted and is currently abandoned. So I'll close this PR 
for now, feel free to restart the discussion on the KIP if it's still something 
you want.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-228 Negative record timestamp support [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #5072:  KIP-228 Negative record timestamp support
URL: https://github.com/apache/kafka/pull/5072


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16281) Possible IllegalState with KIP-996

2024-02-20 Thread Jack Vanlightly (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jack Vanlightly updated KAFKA-16281:

Summary: Possible IllegalState with KIP-996  (was: Probable IllegalState 
possible with KIP-966)

> Possible IllegalState with KIP-996
> --
>
> Key: KAFKA-16281
> URL: https://issues.apache.org/jira/browse/KAFKA-16281
> Project: Kafka
>  Issue Type: Task
>  Components: kraft
>Reporter: Jack Vanlightly
>Priority: Major
>
> I have a TLA+ model of KIP-966 and I have identified an IllegalState 
> exception that would occur with the existing MaybeHandleCommonResponse 
> behavior.
> The issue stems from the fact that a leader, let's call it r1, can resign 
> (either due to a restart or check quorum) and then later initiate a pre-vote 
> where it ends up in the same epoch as before, but a cleared local leader id. 
> When r1 transitions to Prospective it clears its local leader id. When r1 
> receives a response from r2 who believes that r1 is still the leader, the 
> logic in MaybeHandleCommonResponse tries to transition r1 to follower of 
> itself, causing an IllegalState exception to be raised.
> This is an example history:
>  # r1 is the leader in epoch 1.
>  # r1 quorum resigns, or restarts and resigns.
>  # r1 experiences an election timeout and transitions to Prospective clearing 
> its local leader id.
>  # r1 sends a pre vote request to its peers.
>  # r2 thinks r1 is still the leader, sends a vote response, not granting its 
> vote and setting leaderId=r1 and epoch=1.
>  # r1 receives the vote response and executes MaybeHandleCommonResponse which 
> tries to transition r1 to Follower of itself and an illegal state occurs.
> The relevant else if statement in MaybeHandleCommonResponse is here: 
> https://github.com/apache/kafka/blob/a26a1d847f1884a519561e7a4fb4cd13e051c824/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1538
> In the TLA+ specification, I fixed this issue by adding a fourth condition to 
> this statement, that the leaderId also does not equal this server's id. 
> [https://github.com/Vanlightly/kafka-tlaplus/blob/9b2600d1cd5c65930d666b12792d47362b64c015/kraft/kip_996/kraft_kip_996_functions.tla#L336]
> We should probably create a test to confirm the issue first and then look at 
> using the fix I made in the TLA+, though there may be other options.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-2111: Add help arguments and required fields [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #3605:
URL: https://github.com/apache/kafka/pull/3605#issuecomment-1954006952

   This PR is very old and the tools are being rewritten in Java, so I'll close 
this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-2111: Add help arguments and required fields [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #3605: KAFKA-2111: Add help arguments and required 
fields
URL: https://github.com/apache/kafka/pull/3605


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update KafkaLog4jAppender.java adding SaslMechanism as an option [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #3476:
URL: https://github.com/apache/kafka/pull/3476#issuecomment-1954004851

   Considering the age of this PR and that KIP-719 to deprecate the log4j 
appender has been voted, I'll close this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update KafkaLog4jAppender.java adding SaslMechanism as an option [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #3476: Update KafkaLog4jAppender.java adding 
SaslMechanism as an option
URL: https://github.com/apache/kafka/pull/3476


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-3663 : Implements KIP-59 for a kafka-brokers.sh command [kafka]

2024-02-20 Thread via GitHub


mimaison commented on PR #1539:
URL: https://github.com/apache/kafka/pull/1539#issuecomment-1953997528

   KIP-59 has never been voted and is abandoned so closing this PR. Feel free 
to restart the KIP discussion if it's still something you want to do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-3663 : Implements KIP-59 for a kafka-brokers.sh command [kafka]

2024-02-20 Thread via GitHub


mimaison closed pull request #1539: KAFKA-3663 : Implements KIP-59 for a 
kafka-brokers.sh command
URL: https://github.com/apache/kafka/pull/1539


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh

2024-02-20 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16282:
--
Description: 
Currently, when using `kafka-get-offsets.sh` to get the offset by time, we have 
these choices:


{code:java}
--time  /  timestamp of the offsets before 
that. 
  -1 or latest /   [Note: No offset is returned, if the
  -2 or earliest / timestamp greater than recently
  -3 or max-timestamp /committed record timestamp is
  -4 or earliest-local /   given.] (default: latest)
  -5 or latest-tiered  

{code}

For the "latest" option, it'll always return the "high watermark" because we 
always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It 
would be good if the command can support to get the last stable offset (LSO) 
for transaction support. That is, sending the option with 
*IsolationLevel.READ_COMMITTED*


  was:
Currently, when using `kafka-get-offsets.sh` to get the offset by time, we have 
these choices:


{code:java}
--time  /  timestamp of the offsets before 
that. 
  -1 or latest /   [Note: No offset is returned, if the
  -2 or earliest / timestamp greater than recently
  -3 or max-timestamp /committed record timestamp is
  -4 or earliest-local /   given.] (default: latest)
  -5 or latest-tiered  

{code}

For the latest choice, it'll always return the "high watermark" because we 
always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It 
would be good if the command can support to get the last stable offset (LSO) 
for transaction support. That is, sending the option with 
*IsolationLevel.READ_COMMITTED*



> Allow to get last stable offset (LSO) in kafka-get-offsets.sh
> -
>
> Key: KAFKA-16282
> URL: https://issues.apache.org/jira/browse/KAFKA-16282
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Ahmed Sobeh
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> Currently, when using `kafka-get-offsets.sh` to get the offset by time, we 
> have these choices:
> {code:java}
> --time  /  timestamp of the offsets before 
> that. 
>   -1 or latest /   [Note: No offset is returned, if 
> the
>   -2 or earliest / timestamp greater than recently
>   -3 or max-timestamp /committed record timestamp is
>   -4 or earliest-local /   given.] (default: latest)
>   -5 or latest-tiered  
> {code}
> For the "latest" option, it'll always return the "high watermark" because we 
> always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It 
> would be good if the command can support to get the last stable offset (LSO) 
> for transaction support. That is, sending the option with 
> *IsolationLevel.READ_COMMITTED*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh

2024-02-20 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818759#comment-17818759
 ] 

Luke Chen commented on KAFKA-16282:
---

Great! Thanks [~ahmedsobeh]! Let me know if you need any help.

> Allow to get last stable offset (LSO) in kafka-get-offsets.sh
> -
>
> Key: KAFKA-16282
> URL: https://issues.apache.org/jira/browse/KAFKA-16282
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Ahmed Sobeh
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> Currently, when using `kafka-get-offsets.sh` to get the offset by time, we 
> have these choices:
> {code:java}
> --time  /  timestamp of the offsets before 
> that. 
>   -1 or latest /   [Note: No offset is returned, if 
> the
>   -2 or earliest / timestamp greater than recently
>   -3 or max-timestamp /committed record timestamp is
>   -4 or earliest-local /   given.] (default: latest)
>   -5 or latest-tiered  
> {code}
> For the latest choice, it'll always return the "high watermark" because we 
> always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It 
> would be good if the command can support to get the last stable offset (LSO) 
> for transaction support. That is, sending the option with 
> *IsolationLevel.READ_COMMITTED*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2024-02-20 Thread Ondrej Cervinka (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818755#comment-17818755
 ] 

Ondrej Cervinka commented on KAFKA-15302:
-

Hello,

Deleting from a state store while iterating it with KeyValueStore#all or 
KeyValue#range is a pattern which I sometimes use too. Actually, it may be 
quite common approach for this forward-delete scenario.

In case there won't be a fix, updating documentation would be helpful.

 

Are these all valid workarounds? Note that some are quite limiting.

 

1. Flush before iterate

2. Delete (modify) after iteration is finished (store keys for deletion is 
temporary set)

3. Use in-memory stores instead of Rocks DB

4. Disable Streams (heap) cache (or set statestore.cache.max.bytes to 0)

 

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator 
> all()
> 01:05:00.588 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
> 01:05:00.590 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
> 

Re: [PR] MINIOR: Remove accidentally logs [kafka]

2024-02-20 Thread via GitHub


mimaison merged PR #15371:
URL: https://github.com/apache/kafka/pull/15371


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-02-20 Thread via GitHub


Phuc-Hong-Tran commented on PR #15188:
URL: https://github.com/apache/kafka/pull/15188#issuecomment-1953901380

   @cadonna I'll see if there is a way to go around with not using the Google 
library to check regex validity (finger-crossed!)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-02-20 Thread via GitHub


cadonna commented on PR #15188:
URL: https://github.com/apache/kafka/pull/15188#issuecomment-1953893744

   > > > I was wondering whether we should introduce a new error code to signal 
to the user that the regular expression is invalid. Otherwise, we would have to 
use the invalid request exception and it does not seem user friendly. @cadonna 
@lianetm What do you think about this?
   > > 
   > > 
   > > @dajac Is it also possible to verify the validity of the regular 
expression client-side?
   > > If we assume that the clients send valid regular expressions to the 
brokers, I think it would be OK to return an invalid request exception and log 
the error broker-side. Sending invalid regular expressions should than just be 
a mistake that happens during development of the clients and not something that 
happens during usage of the clients.
   > > The benefit would be to find the mistakes in regular expressions without 
a request to the brokers.
   > > The downside of it is that we need some way to validate the regular 
expressions client-side like the corresponding Google library in Java and I do 
not know what dependency are needed for clients in other languages.
   > 
   > I don't think we can include the Google library in the client code. I saw 
the comment about it on the pull request for the implementation of the regex 
logic on the broker. Will find it again and quote it here.
   > 
   > Edit: here is the comment [#14327 
(review)](https://github.com/apache/kafka/pull/14327#pullrequestreview-1622366023)
   
   Ah, I see! Thanks for the link!
   Nevertheless, there it says the resolution of the regular expression 
regarding the existing topics on the brokers. It does not say anything about 
checking the general validity of the regular expression.
   I definitely agree that finding topics that match the regular expression is 
something that needs to be strictly done on the brokers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh

2024-02-20 Thread Ahmed Sobeh (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818729#comment-17818729
 ] 

Ahmed Sobeh edited comment on KAFKA-16282 at 2/20/24 10:14 AM:
---

Hi [~showuon]! I will be picking this up. I will start on the KIP doc and post 
it here once it's ready


was (Author: JIRAUSER295920):
Hi [~showuon]! I will be picking this up. I will start on the KIP docs and post 
it here once it's ready

> Allow to get last stable offset (LSO) in kafka-get-offsets.sh
> -
>
> Key: KAFKA-16282
> URL: https://issues.apache.org/jira/browse/KAFKA-16282
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Ahmed Sobeh
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> Currently, when using `kafka-get-offsets.sh` to get the offset by time, we 
> have these choices:
> {code:java}
> --time  /  timestamp of the offsets before 
> that. 
>   -1 or latest /   [Note: No offset is returned, if 
> the
>   -2 or earliest / timestamp greater than recently
>   -3 or max-timestamp /committed record timestamp is
>   -4 or earliest-local /   given.] (default: latest)
>   -5 or latest-tiered  
> {code}
> For the latest choice, it'll always return the "high watermark" because we 
> always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It 
> would be good if the command can support to get the last stable offset (LSO) 
> for transaction support. That is, sending the option with 
> *IsolationLevel.READ_COMMITTED*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-02-20 Thread via GitHub


Phuc-Hong-Tran commented on PR #15188:
URL: https://github.com/apache/kafka/pull/15188#issuecomment-1953879987

   > > I was wondering whether we should introduce a new error code to signal 
to the user that the regular expression is invalid. Otherwise, we would have to 
use the invalid request exception and it does not seem user friendly. @cadonna 
@lianetm What do you think about this?
   > 
   > 
   > 
   > @dajac Is it also possible to verify the validity of the regular 
expression client-side?
   > 
   > If we assume that the clients send valid regular expressions to the 
brokers, I think it would be OK to return an invalid request exception and log 
the error broker-side. Sending invalid regular expressions should than just be 
a mistake that happens during development of the clients and not something that 
happens during usage of the clients.
   > 
   > 
   > 
   > The benefit would be to find the mistakes in regular expressions without a 
request to the brokers.  
   > 
   >  
   > 
   > The downside of it is that we need some way to validate the regular 
expressions client-side like the corresponding Google library in Java and I do 
not know what dependency are needed for clients in other languages.
   > 
   > 
   
   I don't think we can include the Google library in the client code. I saw 
the comment about it on the pull request for the implementation of the regex 
logic on the broker. Will find it again and quote it here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-02-20 Thread via GitHub


cadonna commented on PR #15188:
URL: https://github.com/apache/kafka/pull/15188#issuecomment-1953872828

   > I was wondering whether we should introduce a new error code to signal to 
the user that the regular expression is invalid. Otherwise, we would have to 
use the invalid request exception and it does not seem user friendly. @cadonna 
@lianetm What do you think about this?
   
   @dajac Is it also possible to verify the validity of the regular expression 
client-side?
   If we assume that the clients send valid regular expressions to the brokers, 
I think it would be OK to return an invalid request exception and log the error 
broker-side. Sending invalid regular expressions should than just be a mistake 
that happens during development of the clients and not something that happens 
during usage of the clients.
   
   The benefit would be to find the mistakes in regular expressions without a 
request to the brokers.  

   The downside of it is that we need some way to validate the regular 
expressions client-side like the corresponding Google library in Java and I do 
not know what dependency are needed for clients in other languages.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh

2024-02-20 Thread Ahmed Sobeh (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818729#comment-17818729
 ] 

Ahmed Sobeh commented on KAFKA-16282:
-

Hi [~showuon]! I will be picking this up. I will start on the KIP docs and post 
it here once it's ready

> Allow to get last stable offset (LSO) in kafka-get-offsets.sh
> -
>
> Key: KAFKA-16282
> URL: https://issues.apache.org/jira/browse/KAFKA-16282
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Ahmed Sobeh
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> Currently, when using `kafka-get-offsets.sh` to get the offset by time, we 
> have these choices:
> {code:java}
> --time  /  timestamp of the offsets before 
> that. 
>   -1 or latest /   [Note: No offset is returned, if 
> the
>   -2 or earliest / timestamp greater than recently
>   -3 or max-timestamp /committed record timestamp is
>   -4 or earliest-local /   given.] (default: latest)
>   -5 or latest-tiered  
> {code}
> For the latest choice, it'll always return the "high watermark" because we 
> always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It 
> would be good if the command can support to get the last stable offset (LSO) 
> for transaction support. That is, sending the option with 
> *IsolationLevel.READ_COMMITTED*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16243) Idle kafka-console-consumer with new consumer group protocol preemptively leaves group

2024-02-20 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-16243:
---
Fix Version/s: 3.8.0

> Idle kafka-console-consumer with new consumer group protocol preemptively 
> leaves group
> --
>
> Key: KAFKA-16243
> URL: https://issues.apache.org/jira/browse/KAFKA-16243
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Lucas Brutschy
>Priority: Critical
> Fix For: 3.8.0
>
>
> Using the new consumer group protocol with kafka-console-consumer.sh, I find 
> that if I leave the consumer with no records to process for 5 minutes 
> (max.poll.interval.ms = 30ms), the tool logs the following warning 
> message and leaves the group.
> "consumer poll timeout has expired. This means the time between subsequent 
> calls to poll() was longer than the configured max.poll.interval.ms, which 
> typically implies that the poll loop is spending too much time processing 
> messages. You can address this either by increasing max.poll.interval.ms or 
> by reducing the maximum size of batches returned in poll() with 
> max.poll.records."
> With the older consumer, this did not occur.
> The reason is that the consumer keeps a poll timer which is used to ensure 
> liveness of the application thread. The poll timer automatically updates 
> while the `Consumer.poll(Duration)` method is blocked, while the newer 
> consumer only updates the poll timer when a new call to 
> `Consumer.poll(Duration)` is issued. This means that the 
> kafka-console-consumer.sh tools, which uses a very long timeout by default, 
> works differently with the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh

2024-02-20 Thread Ahmed Sobeh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Sobeh reassigned KAFKA-16282:
---

Assignee: Ahmed Sobeh

> Allow to get last stable offset (LSO) in kafka-get-offsets.sh
> -
>
> Key: KAFKA-16282
> URL: https://issues.apache.org/jira/browse/KAFKA-16282
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Ahmed Sobeh
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> Currently, when using `kafka-get-offsets.sh` to get the offset by time, we 
> have these choices:
> {code:java}
> --time  /  timestamp of the offsets before 
> that. 
>   -1 or latest /   [Note: No offset is returned, if 
> the
>   -2 or earliest / timestamp greater than recently
>   -3 or max-timestamp /committed record timestamp is
>   -4 or earliest-local /   given.] (default: latest)
>   -5 or latest-tiered  
> {code}
> For the latest choice, it'll always return the "high watermark" because we 
> always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It 
> would be good if the command can support to get the last stable offset (LSO) 
> for transaction support. That is, sending the option with 
> *IsolationLevel.READ_COMMITTED*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-20 Thread via GitHub


lucasbru merged PR #15372:
URL: https://github.com/apache/kafka/pull/15372


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] IGNORE: flaky testing [kafka]

2024-02-20 Thread via GitHub


lucasbru closed pull request #14780: IGNORE: flaky testing
URL: https://github.com/apache/kafka/pull/14780


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-02-20 Thread via GitHub


cadonna commented on PR #15188:
URL: https://github.com/apache/kafka/pull/15188#issuecomment-1953815656

   > > > @cadonna @lianetm, since we're supporting for both subscribe method 
using java.util.regex.Pattern and SubscriptionPattern, I think we should throw 
a illegal heartbeat exeption when user try to use both method at the same time 
and inform the user to use once at a time, since the field SubscribedRegex is 
used for java.util.regex.Pattern as well as SubscriptionPattern. What do you 
guys think?
   > > 
   > > 
   > > IMO, we must support the deprecated pattern subscriptions with 
`java.util.regex.Pattern` to ensure backwards compatibility, but we do not need 
to support mixed usage of `java.util.regex.Pattern` and Google regex patterns. 
I think this is a blind spot in the KIP. I propose to throw an 
`IllegalStateException` if `subscribe(java.util.regex.Pattern)` is called after 
`subscribe(SubscriptionPattern)` (and vice versa) without calling 
`unsubscribe()` in between. That is similar to the restrictions between 
pattern, topic, and partition subscriptions @lianetm linked above. I do not 
think it is worth to consider the edge case of mixed usage of the two pattern 
types. Does this make sense to you? \cc @dajac What do you as the original 
author of the KIP think? Should we update the KIP to make this clear?
   > 
   > @cadonna I would rather follow what we already do with `subscribe` today. 
The last one called takes precedence.
   
   @dajac The javadocs of `subscribe()` say:
   
   ```
* @throws IllegalStateException If {@code subscribe()} is called 
previously with topics, or assign is called
*   previously (without a subsequent call 
to {@link #unsubscribe()}), or if not
*   configured at-least one partition 
assignment strategy
   ```
   
https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L727
   
   One could argue that both subscriptions are pattern subscriptions, but they 
are quite different internally. I am wondering how complex it is to allow mixed 
usage.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



<    1   2   3   >