[jira] [Updated] (KAFKA-16251) Fenced member should not send heartbeats while waiting for onPartitionsLost to complete
[ https://issues.apache.org/jira/browse/KAFKA-16251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16251: -- Priority: Critical (was: Major) > Fenced member should not send heartbeats while waiting for onPartitionsLost > to complete > --- > > Key: KAFKA-16251 > URL: https://issues.apache.org/jira/browse/KAFKA-16251 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support > > When a member gets fenced, it transitions to FENCED state and triggers the > onPartitionsLost callback to release it assignment. Members should stop > sending heartbeats while FENCED, and resume sending it only after completing > the callback, when it transitions to JOINING. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15561) Client support for new SubscriptionPattern based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15561: -- Priority: Major (was: Critical) > Client support for new SubscriptionPattern based subscription > - > > Key: KAFKA-15561 > URL: https://issues.apache.org/jira/browse/KAFKA-15561 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848-client-support, regex > Fix For: 3.8.0 > > > New consumer should support subscribe with the new SubscriptionPattern > introduced in the new consumer group protocol. When subscribing with this > regex, the client should provide the regex in the HB request on the > SubscribedTopicRegex field, delegating the resolution to the server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15538: -- Priority: Blocker (was: Critical) > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Blocker > Labels: kip-848-client-support, newbie, regex > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. > As part of this task, we should re-enable all integration tests defined in > the PlainTextAsyncConsumer that relate to subscription with pattern and that > are currently disabled for the new consumer + new protocol -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16286; Notify listener of latest leader and epoch [kafka]
jsancio commented on code in PR #15397: URL: https://github.com/apache/kafka/pull/15397#discussion_r1496389863 ## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ## @@ -2958,6 +2958,59 @@ public void testHandleCommitCallbackFiresInCandidateState() throws Exception { assertEquals(OptionalInt.empty(), secondListener.currentClaimedEpoch()); } +@Test +public void testHandleLeaderChangeFiresAfterUnattachedRegistration() throws Exception { +int localId = 0; +int otherNodeId = 1; +int epoch = 7; +Set voters = Utils.mkSet(localId, otherNodeId); + +RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) +.withUnknownLeader(epoch) +.build(); + +// Register another listener and verify that it is notified of latest epoch +RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener( +OptionalInt.of(localId) +); +context.client.register(secondListener); +context.client.poll(); + +// Expected leader change notification +LeaderAndEpoch expectedLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), epoch); +assertEquals(expectedLeaderAndEpoch, secondListener.currentLeaderAndEpoch()); + +// Transition to follower and observer leader change +context.deliverRequest(context.beginEpochRequest(epoch, otherNodeId)); +context.pollUntilResponse(); + +// Expected leader change notification +expectedLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.of(otherNodeId), epoch); +assertEquals(expectedLeaderAndEpoch, secondListener.currentLeaderAndEpoch()); +} + +@Test +public void testHandleLeaderChangeFiresAfterFollowerRegistration() throws Exception { +int localId = 0; +int otherNodeId = 1; +int epoch = 7; +Set voters = Utils.mkSet(localId, otherNodeId); + +RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) +.withElectedLeader(epoch, otherNodeId) +.build(); + +// Register another listener and verify that it is notified of latest epoch +RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener( +OptionalInt.of(localId) +); +context.client.register(secondListener); +context.client.poll(); + +LeaderAndEpoch expectedLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.of(otherNodeId), epoch); +assertEquals(expectedLeaderAndEpoch, secondListener.currentLeaderAndEpoch()); +} Review Comment: Both of these tests fail against trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16286; Notify listener of latest leader and epoch [kafka]
jsancio opened a new pull request, #15397: URL: https://github.com/apache/kafka/pull/15397 KRaft was only notifying listeners of the latest leader and epoch when the replica transition to a new state. This can result in the listener never getting notified if the registration happened after it had become a follower. This problem doesn't exists for the active leader because the KRaft implementation attempts to notified the listener of the latest leader and epoch when the replica is the active leader. This issue is fixed by notifying the listeners of the latest leader and epoch after processing the listener registration request. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove unnecessary logging [kafka]
mjsax commented on code in PR #15396: URL: https://github.com/apache/kafka/pull/15396#discussion_r1496379447 ## streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java: ## @@ -264,7 +264,6 @@ public void put(final Bytes key, final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); if (segment == null) { expiredRecordSensor.record(1.0d, context.currentSystemTimeMs()); -LOG.warn("Skipping record for expired segment."); Review Comment: I don't see much value? We don't log any detailed information... -- the log line only gives you a processing time timestamp but nothing more... If we would want to make it useful, we could add offset/timestamp metadata (but not log key/value as we never log data...) -- for this case, it might be useful to keep at DEBUG or TRACE. Let's hear from others about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove unnecessary logging [kafka]
kpatelatwork commented on code in PR #15396: URL: https://github.com/apache/kafka/pull/15396#discussion_r1496373291 ## streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java: ## @@ -264,7 +264,6 @@ public void put(final Bytes key, final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); if (segment == null) { expiredRecordSensor.record(1.0d, context.currentSystemTimeMs()); -LOG.warn("Skipping record for expired segment."); Review Comment: is there a value in converting to debug instead of removing it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1496360067 ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -81,11 +87,16 @@ public Set filteredProducerIds() { return filteredProducerIds; } +public long getDurationFilter() { +return durationFilter; +} + @Override public String toString() { return "ListTransactionsOptions(" + "filteredStates=" + filteredStates + ", filteredProducerIds=" + filteredProducerIds + +", durationFilter=" + durationFilter + Review Comment: updated in [8bfc13a](https://github.com/apache/kafka/pull/15384/commits/8bfc13ae7551ecbc6df2502a4904892a4d1402b7) ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -81,11 +87,16 @@ public Set filteredProducerIds() { return filteredProducerIds; } +public long getDurationFilter() { Review Comment: updated in [8bfc13a](https://github.com/apache/kafka/pull/15384/commits/8bfc13ae7551ecbc6df2502a4904892a4d1402b7) ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -61,6 +62,11 @@ public ListTransactionsOptions filterProducerIds(Collection producerIdFilt return this; } +public ListTransactionsOptions durationFilter(long durationMs) { Review Comment: updated in [8bfc13a](https://github.com/apache/kafka/pull/15384/commits/8bfc13ae7551ecbc6df2502a4904892a4d1402b7) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
CalvinConfluent commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1496355495 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2129,63 +2167,183 @@ private Map> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); -Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), -new LeastLoadedNodeProvider()) { -private boolean supportsDisablingTopicCreation = true; +if (options.useDescribeTopicsApi()) { +RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { Review Comment: Thanks for the advice. We still want to avoid loading a huge topic in the memory but the current implementation does not really achieve that. I will make a change to address this. Maybe it is a bit over-engineering to have more than 1 ongoing call in the pipeline. I will keep at most one call in the next iteration commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16206 Fix unnecessary topic config deletion during ZK migration [kafka]
ahuang98 commented on PR #14206: URL: https://github.com/apache/kafka/pull/14206#issuecomment-1954880081 Thanks @mimaison, looks like this is because I split out the TopicsImageTest changes into a separate PR. The migration tests depend on the `DELTA1_RECORDS` defined there so I moved over changes from TopicsImageTest as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: remove unnecessary logging [kafka]
mjsax opened a new pull request, #15396: URL: https://github.com/apache/kafka/pull/15396 We already record dropping record via metrics and logging at WARN level is too noise. This PR removes the unnecessary logging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1496339739 ## tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java: ## @@ -187,14 +187,25 @@ private void testDescribeProducers( assertEquals(expectedRows, new HashSet<>(table.subList(1, table.size(; } -@Test -public void testListTransactions() throws Exception { +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testListTransactions(boolean hasDurationFilter) throws Exception { String[] args = new String[] { "--bootstrap-server", "localhost:9092", "list" }; +if (hasDurationFilter) { +args = new String[] { +"--bootstrap-server", +"localhost:9092", +"list", +"--duration-filter", +Long.toString(Long.MAX_VALUE) Review Comment: This just tests the cli cmd parsing parameters correctly. It will not call actual list transaction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16286) KRaft doesn't always notify listener of latest leader
[ https://issues.apache.org/jira/browse/KAFKA-16286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-16286: --- Description: If a listener registers with RaftClient after the KRaft replica has transition to follower it will not get notified of the current leader until it has transitioned to another state. In a stable cluster the listeners that are not the leader (inactive controllers and brokers) will only get notified when then leader changes. was: If a listener register with RaftClient after the KRaft replica has transition to follower it will not get notified of the current leader until it has transitioned to another state. In a stable cluster the listeners that are not the active leader (inactive controllers and brokers) will only get notified when then leader changes. > KRaft doesn't always notify listener of latest leader > - > > Key: KAFKA-16286 > URL: https://issues.apache.org/jira/browse/KAFKA-16286 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > > If a listener registers with RaftClient after the KRaft replica has > transition to follower it will not get notified of the current leader until > it has transitioned to another state. > In a stable cluster the listeners that are not the leader (inactive > controllers and brokers) will only get notified when then leader changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16286) KRaft doesn't always notify listener of latest leader
José Armando García Sancio created KAFKA-16286: -- Summary: KRaft doesn't always notify listener of latest leader Key: KAFKA-16286 URL: https://issues.apache.org/jira/browse/KAFKA-16286 Project: Kafka Issue Type: Bug Components: kraft Reporter: José Armando García Sancio Assignee: José Armando García Sancio If a listener register with RaftClient after the KRaft replica has transition to follower it will not get notified of the current leader until it has transitioned to another state. In a stable cluster the listeners that are not the active leader (inactive controllers and brokers) will only get notified when then leader changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
artemlivshits commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1495047191 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java: ## @@ -36,28 +38,38 @@ public class DescribeTopicsResult { private final Map> topicIdFutures; private final Map> nameFutures; +private final Iterator>> nameFuturesIterator; @Deprecated protected DescribeTopicsResult(Map> futures) { -this(null, futures); +this(null, futures, null); } // VisibleForTesting -protected DescribeTopicsResult(Map> topicIdFutures, Map> nameFutures) { -if (topicIdFutures != null && nameFutures != null) -throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be specified."); -if (topicIdFutures == null && nameFutures == null) -throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be null."); +protected DescribeTopicsResult( +Map> topicIdFutures, +Map> nameFutures, +Iterator>> nameFuturesIterator +) { +if (topicIdFutures != null && nameFutures != null && nameFuturesIterator != null) +throw new IllegalArgumentException("topicIdFutures and nameFutures and nameFutureIterator cannot both be specified."); Review Comment: nit: using "both" implies that there are 2 things, we now have 3. ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2129,63 +2167,183 @@ private Map> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); -Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), -new LeastLoadedNodeProvider()) { -private boolean supportsDisablingTopicCreation = true; +if (options.useDescribeTopicsApi()) { +RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { Review Comment: Didn't quite get how the recurring call framework enables for lazy generation? The RecurringCall.run() just runs the whole thing in one go. ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2129,63 +2167,183 @@ private Map> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); -Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), -new LeastLoadedNodeProvider()) { -private boolean supportsDisablingTopicCreation = true; +if (options.useDescribeTopicsApi()) { +RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { Review Comment: Looking at the current logic I cannot make sense out of it. For some reason we get all topic names that fit under the batch size and then get all partitions for those topics and put them in memory. Why did we bother with all paging complexity if all we do is to get all partitions for 2000 topics in memory? Can we just iterate topic by topic (as it seems that we get the list of all topics anyway) and just get partitions for each one? With the paging, I would've expected that we did something like this: 1. start iteration -- issue one call, once the first call is completed maybe issue a few more (say 3) calls ahead for pipelining. 2. next -- iterate over current batch. 3. once the current batch is exhausted, switch to the next batch, pipeline one more call. This way if the caller finishes iteration early or if iteration is slow, we won't keep piling results in memory. ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) throws ExecutionException, I } else { ensureTopicExists(topics, opts.topic().orElse(""), !opts.ifExists()); } -List topicDescriptions = new ArrayList<>(); if (!topicIds.isEmpty()) { Map descTopics = adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get(); -topicDescriptions = new ArrayList<>(descTopics.values()); +describeTopicsFollowUp(new ArrayList<>(descTopics.values()), opts); +return; } if (!topics.isEmpty()) { -Map descTopics = - adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get(); -topicDescriptions = new ArrayList<>(descTopics.values()); +final int partitionSizeLimit = opts.partitionSizeLimitPerResponse().orElse(2000); +try { +Iterator>> des
[jira] [Assigned] (KAFKA-14747) FK join should record discarded subscription responses
[ https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ayoub Omari reassigned KAFKA-14747: --- Assignee: Ayoub Omari (was: Koma Zhang) > FK join should record discarded subscription responses > -- > > Key: KAFKA-14747 > URL: https://issues.apache.org/jira/browse/KAFKA-14747 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Ayoub Omari >Priority: Minor > Labels: beginner, newbie > > FK-joins are subject to a race condition: If the left-hand side record is > updated, a subscription is sent to the right-hand side (including a hash > value of the left-hand side record), and the right-hand side might send back > join responses (also including the original hash). The left-hand side only > processed the responses if the returned hash matches to current hash of the > left-hand side record, because a different hash implies that the lef- hand > side record was updated in the mean time (including sending a new > subscription to the right hand side), and thus the data is stale and the > response should not be processed (joining the response to the new record > could lead to incorrect results). > A similar thing can happen on a right-hand side update that triggers a > response, that might be dropped if the left-hand side record was updated in > parallel. > While the behavior is correct, we don't record if this happens. We should > consider to record this using the existing "dropped record" sensor or maybe > add a new sensor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-14747: record discarded FK join subscription responses [kafka]
AyoubOm opened a new pull request, #15395: URL: https://github.com/apache/kafka/pull/15395 *As described in KAFKA-14747, we are not recording discarded FK join responses in case of receiving a join response for an old record (whose hash value has changed in the left table). This PR adds the record to dropped records sensor* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios
[ https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16111: - Assignee: Lucas Brutschy > Implement tests for tricky rebalance callback scenarios > --- > > Key: KAFKA-16111 > URL: https://issues.apache.org/jira/browse/KAFKA-16111 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Blocker > Labels: callback, consumer-threading-refactor, integration-tests > Fix For: 3.8.0 > > > There is justified concern that the new threading model may not play well > with "tricky" {{ConsumerRebalanceListener}} callbacks. We need to provide > some assurance that it will support complicated patterns. > # Design and implement test scenarios > # Update and document any design changes with the callback sub-system where > needed > # Provide fix(es) to the {{AsyncKafkaConsumer}} implementation to abide by > said design -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16285) Make group metadata available when a new assignment is set in async Kafka consumer
[ https://issues.apache.org/jira/browse/KAFKA-16285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16285: - Assignee: Bruno Cadonna > Make group metadata available when a new assignment is set in async Kafka > consumer > -- > > Key: KAFKA-16285 > URL: https://issues.apache.org/jira/browse/KAFKA-16285 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Labels: kip-848-client-support > > Currently, the new async Kafka consumer sends an event from the background > thread to the application thread when the group metadata is updated. Group > metadata is updated when the background thread receives a new assignment. > More specifically, the member epoch is updated each time a new assignment is > received and and the member ID is updated with the first assignment. > In contrast to the group metadata update, the assignment is directly set in > the subscription without sending an update event from the background thread > to the application thread. That means that there is a delay between the > application thread being aware of the update to the assignment and the > application thread being aware of the update to the group metadata. This > behavior differs with respect to the legacy consumer were the assignment and > the group metadata is updated at the same time. > We should make the update to the group metadata available to the application > thread when the update to the assignment is made available to the application > thread so that assignment an group metadata are in sync. > For example, {{producer.sendOffsetsToTransaction(offsetsToCommit, > groupMetadata);}} benefits from this improvement because if the offsets to > commit are consistent with the current assignment also the group metadata > would be. Currently, that is not guaranteed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses
[ https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818919#comment-17818919 ] Ayoub Omari commented on KAFKA-14747: - Picking this up since [~kma] didn't respond :) > FK join should record discarded subscription responses > -- > > Key: KAFKA-14747 > URL: https://issues.apache.org/jira/browse/KAFKA-14747 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Koma Zhang >Priority: Minor > Labels: beginner, newbie > > FK-joins are subject to a race condition: If the left-hand side record is > updated, a subscription is sent to the right-hand side (including a hash > value of the left-hand side record), and the right-hand side might send back > join responses (also including the original hash). The left-hand side only > processed the responses if the returned hash matches to current hash of the > left-hand side record, because a different hash implies that the lef- hand > side record was updated in the mean time (including sending a new > subscription to the right hand side), and thus the data is stale and the > response should not be processed (joining the response to the new record > could lead to incorrect results). > A similar thing can happen on a right-hand side update that triggers a > response, that might be dropped if the left-hand side record was updated in > parallel. > While the behavior is correct, we don't record if this happens. We should > consider to record this using the existing "dropped record" sensor or maybe > add a new sensor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm commented on PR #15357: URL: https://github.com/apache/kafka/pull/15357#issuecomment-1954655740 Thanks for the comments @lucasbru , all addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16194: Do not return records from poll if group metadata unknown [kafka]
cadonna commented on code in PR #15369: URL: https://github.com/apache/kafka/pull/15369#discussion_r1496180609 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -727,6 +732,17 @@ public ConsumerRecords poll(final Duration timeout) { } } +private boolean isGenerationKnown() { Review Comment: Probably, I can do something like ```java if (subscriptions.hasAutoAssignedPartitions()) { return groupMetadata.filter(g -> g.generationId() != JoinGroupRequest.UNKNOWN_GENERATION_ID).isPresent(); } return true; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm commented on code in PR #15357: URL: https://github.com/apache/kafka/pull/15357#discussion_r1496179477 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -1147,15 +1202,15 @@ List drain(final long currentTimeMs) { * futures with a TimeoutException. */ private void failAndRemoveExpiredCommitRequests(final long currentTimeMs) { -unsentOffsetCommits.removeIf(req -> req.maybeExpire(currentTimeMs)); +unsentOffsetCommits.forEach(req -> req.maybeExpire(currentTimeMs)); Review Comment: Definitely, good catch, fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16194: Do not return records from poll if group metadata unknown [kafka]
cadonna commented on code in PR #15369: URL: https://github.com/apache/kafka/pull/15369#discussion_r1496167714 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -727,6 +732,17 @@ public ConsumerRecords poll(final Duration timeout) { } } +private boolean isGenerationKnown() { Review Comment: I am not sure this is correct, because I can use user assigned partitions but still use a group ID to manage the offsets broker-side, if I remember correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16194: Do not return records from poll if group metadata unknown [kafka]
cadonna commented on code in PR #15369: URL: https://github.com/apache/kafka/pull/15369#discussion_r1496159485 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -708,14 +709,18 @@ public ConsumerRecords poll(final Duration timeout) { wakeupTrigger.maybeTriggerWakeup(); updateAssignmentMetadataIfNeeded(timer); -final Fetch fetch = pollForFetches(timer); -if (!fetch.isEmpty()) { -if (fetch.records().isEmpty()) { -log.trace("Returning empty records from `poll()` " +if (isGenerationKnown()) { Review Comment: The busy loop should not be too bad since the group metadata update event is added to the queue when also the new assignment is set in the membership manager. Moreover, this is kind of a temporary solution until https://issues.apache.org/jira/browse/KAFKA-16285 is done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16194: Do not return records from poll if group metadata unknown [kafka]
cadonna commented on code in PR #15369: URL: https://github.com/apache/kafka/pull/15369#discussion_r1496159485 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -708,14 +709,18 @@ public ConsumerRecords poll(final Duration timeout) { wakeupTrigger.maybeTriggerWakeup(); updateAssignmentMetadataIfNeeded(timer); -final Fetch fetch = pollForFetches(timer); -if (!fetch.isEmpty()) { -if (fetch.records().isEmpty()) { -log.trace("Returning empty records from `poll()` " +if (isGenerationKnown()) { Review Comment: The busy loop should not be too bad since the group metadata update event is added to the queue when also the new assignment is set in the membership manager. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh
[ https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818903#comment-17818903 ] Justine Olshan commented on KAFKA-16282: I am also happy to review the KIP and PRs :) > Allow to get last stable offset (LSO) in kafka-get-offsets.sh > - > > Key: KAFKA-16282 > URL: https://issues.apache.org/jira/browse/KAFKA-16282 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Ahmed Sobeh >Priority: Major > Labels: need-kip, newbie, newbie++ > > Currently, when using `kafka-get-offsets.sh` to get the offset by time, we > have these choices: > {code:java} > --time / timestamp of the offsets before > that. > -1 or latest / [Note: No offset is returned, if > the > -2 or earliest / timestamp greater than recently > -3 or max-timestamp /committed record timestamp is > -4 or earliest-local / given.] (default: latest) > -5 or latest-tiered > {code} > For the "latest" option, it'll always return the "high watermark" because we > always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It > would be good if the command can support to get the last stable offset (LSO) > for transaction support. That is, sending the option with > *IsolationLevel.READ_COMMITTED* -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1496132259 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig, def handleListTransactions( filteredProducerIds: Set[Long], -filteredStates: Set[String] +filteredStates: Set[String], +durationFilter: Long = -1 Review Comment: Im not sure what he is supposed to change 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm commented on code in PR #15357: URL: https://github.com/apache/kafka/pull/15357#discussion_r1496101641 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -204,126 +205,315 @@ private static long findMinTime(final Collection request } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ -private CompletableFuture maybeAutoCommit(final Map offsets, -final Optional expirationTimeMs, -boolean checkInterval, -boolean retryOnStaleEpoch) { -if (!autoCommitEnabled()) { -log.debug("Skipping auto-commit because auto-commit config is not enabled."); -return CompletableFuture.completedFuture(null); -} - +private CompletableFuture> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); -if (checkInterval && !autocommit.shouldAutoCommit()) { -return CompletableFuture.completedFuture(null); +CompletableFuture> result; +if (requestState.offsets.isEmpty()) { +result = CompletableFuture.completedFuture(Collections.emptyMap()); +} else { +autocommit.setInflightCommitStatus(true); +OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); +result = request.future; +result.whenComplete(autoCommitCallback(request.offsets)); } - -autocommit.resetTimer(); -autocommit.setInflightCommitStatus(true); -CompletableFuture result = addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch) -.whenComplete(autoCommitCallback(offsets)); return result; } /** - * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. Note on auto-commit timers: this will reset the - * auto-commit timer to the interval before issuing the async commit, and when the async commit - * completes, it will reset the auto-commit timer with the exponential backoff if the request - * failed with a retriable error. - * - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * If auto-commit is enabled, and the auto-commit interval has expired, this will generate and + * enqueue a request to commit all consumed offsets, and will reset the auto-commit timer to the + * interval. The request will be sent on the next call to {@link #poll(long)}. + * + * If the request completes with a retriable error, this will reset the auto-commit timer with + * the exponential backoff. If it fails with a non-retriable error, no action is taken, so + * the next commit will be generated when the interval expires. */ -public CompletableFuture maybeAutoCommitAllConsumedAsync() { -if (!autoCommitEnabled()) { -// Early return to
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm commented on code in PR #15357: URL: https://github.com/apache/kafka/pull/15357#discussion_r1496091256 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -719,26 +846,27 @@ NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(final Abstr private void handleClientResponse(final ClientResponse response, final Throwable error, - final long currentTimeMs) { + final long requestCompletionTimeMs) { try { if (error == null) { onResponse(response); } else { log.debug("{} completed with error", requestDescription(), error); -handleCoordinatorDisconnect(error, currentTimeMs); -if (error instanceof RetriableException) { -maybeRetry(currentTimeMs, error); -} else { -future().completeExceptionally(error); -} +onFailedAttempt(requestCompletionTimeMs); +handleCoordinatorDisconnect(error, requestCompletionTimeMs); +future().completeExceptionally(error); } } catch (Throwable t) { -log.error("Unexpected error handling response for ", requestDescription(), t); +log.error("Unexpected error handling response for {}", requestDescription(), t); future().completeExceptionally(t); } } abstract void onResponse(final ClientResponse response); + +abstract boolean retryTimeoutExpired(long currentTimeMs); Review Comment: Good point, done. I missed that probably influenced by the fact that, in the case of the commits, the expirationTimeout does not apply to all request. But I do agree with you that the expiration belongs with the retry logic so it does fit well at the RetriableRequest level (just Optional). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm commented on code in PR #15357: URL: https://github.com/apache/kafka/pull/15357#discussion_r1496065426 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -204,126 +205,315 @@ private static long findMinTime(final Collection request } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ -private CompletableFuture maybeAutoCommit(final Map offsets, -final Optional expirationTimeMs, -boolean checkInterval, -boolean retryOnStaleEpoch) { -if (!autoCommitEnabled()) { -log.debug("Skipping auto-commit because auto-commit config is not enabled."); -return CompletableFuture.completedFuture(null); -} - +private CompletableFuture> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); -if (checkInterval && !autocommit.shouldAutoCommit()) { -return CompletableFuture.completedFuture(null); +CompletableFuture> result; +if (requestState.offsets.isEmpty()) { +result = CompletableFuture.completedFuture(Collections.emptyMap()); +} else { +autocommit.setInflightCommitStatus(true); +OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); +result = request.future; +result.whenComplete(autoCommitCallback(request.offsets)); } - -autocommit.resetTimer(); -autocommit.setInflightCommitStatus(true); -CompletableFuture result = addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch) -.whenComplete(autoCommitCallback(offsets)); return result; } /** - * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. Note on auto-commit timers: this will reset the - * auto-commit timer to the interval before issuing the async commit, and when the async commit - * completes, it will reset the auto-commit timer with the exponential backoff if the request - * failed with a retriable error. - * - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * If auto-commit is enabled, and the auto-commit interval has expired, this will generate and + * enqueue a request to commit all consumed offsets, and will reset the auto-commit timer to the + * interval. The request will be sent on the next call to {@link #poll(long)}. + * + * If the request completes with a retriable error, this will reset the auto-commit timer with + * the exponential backoff. If it fails with a non-retriable error, no action is taken, so + * the next commit will be generated when the interval expires. */ -public CompletableFuture maybeAutoCommitAllConsumedAsync() { -if (!autoCommitEnabled()) { -// Early return to
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm commented on code in PR #15357: URL: https://github.com/apache/kafka/pull/15357#discussion_r1496060942 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -204,126 +205,315 @@ private static long findMinTime(final Collection request } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ -private CompletableFuture maybeAutoCommit(final Map offsets, -final Optional expirationTimeMs, -boolean checkInterval, -boolean retryOnStaleEpoch) { -if (!autoCommitEnabled()) { -log.debug("Skipping auto-commit because auto-commit config is not enabled."); -return CompletableFuture.completedFuture(null); -} - +private CompletableFuture> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); -if (checkInterval && !autocommit.shouldAutoCommit()) { -return CompletableFuture.completedFuture(null); +CompletableFuture> result; +if (requestState.offsets.isEmpty()) { +result = CompletableFuture.completedFuture(Collections.emptyMap()); +} else { +autocommit.setInflightCommitStatus(true); +OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); +result = request.future; +result.whenComplete(autoCommitCallback(request.offsets)); } - -autocommit.resetTimer(); -autocommit.setInflightCommitStatus(true); -CompletableFuture result = addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch) -.whenComplete(autoCommitCallback(offsets)); return result; } /** - * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. Note on auto-commit timers: this will reset the - * auto-commit timer to the interval before issuing the async commit, and when the async commit - * completes, it will reset the auto-commit timer with the exponential backoff if the request - * failed with a retriable error. - * - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * If auto-commit is enabled, and the auto-commit interval has expired, this will generate and + * enqueue a request to commit all consumed offsets, and will reset the auto-commit timer to the + * interval. The request will be sent on the next call to {@link #poll(long)}. + * + * If the request completes with a retriable error, this will reset the auto-commit timer with + * the exponential backoff. If it fails with a non-retriable error, no action is taken, so + * the next commit will be generated when the interval expires. */ -public CompletableFuture maybeAutoCommitAllConsumedAsync() { -if (!autoCommitEnabled()) { -// Early return to
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm commented on code in PR #15357: URL: https://github.com/apache/kafka/pull/15357#discussion_r1496049319 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -149,8 +148,14 @@ private void process(final CommitApplicationEvent event) { } CommitRequestManager manager = requestManagers.commitRequestManager.get(); -Optional expirationTimeMs = event.retryTimeoutMs().map(this::getExpirationTimeForTimeout); -event.chain(manager.addOffsetCommitRequest(event.offsets(), expirationTimeMs, false)); +CompletableFuture commitResult; +if (event.retryTimeoutMs().isPresent()) { Review Comment: Say no more, I was leaning towards that too (just a bit hesitant to introduce more on this already big PR), but totally agree that it's conceptually clearer, the result is nice, done. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1344,8 +1343,8 @@ public void commitSync(Map offsets, Duration long commitStart = time.nanoseconds(); try { Timer requestTimer = time.timer(timeout.toMillis()); -// Commit with a timer to control how long the request should be retried until it -// gets a successful response or non-retriable error. +// Commit with a retry timeout (the commit request will be retried until it gets a +// successful response, non-retriable error, or the timeout expires) CompletableFuture commitFuture = commit(offsets, true, Optional.of(timeout.toMillis())); Review Comment: Makes sense, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818856#comment-17818856 ] Hector Geraldino commented on KAFKA-16223: -- One thing that worked for me when migrating the WorkerSinkTaskTest was creating a separate *WorkerSinkTaskMockTest* test class and splitting the migration in smaller batches. Is that something you'd consider? > Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest > --- > > Key: KAFKA-16223 > URL: https://issues.apache.org/jira/browse/KAFKA-16223 > Project: Kafka > Issue Type: Sub-task > Components: connect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]
msn-tldr commented on PR #15385: URL: https://github.com/apache/kafka/pull/15385#issuecomment-1954448982 All the Jenkins test failures are in different tests, which are already known to have flaky history. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16194: Do not return records from poll if group metadata unknown [kafka]
cadonna commented on code in PR #15369: URL: https://github.com/apache/kafka/pull/15369#discussion_r1495859702 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1467,6 +1483,7 @@ public void unsubscribe() { } catch (TimeoutException e) { log.error("Failed while waiting for the unsubscribe event to complete"); } +groupMetadata = initializeGroupMetadata(groupMetadata.get().groupId(), Optional.empty()); Review Comment: Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16284: Fix performance regression in RocksDB [kafka]
lucasbru commented on PR #15393: URL: https://github.com/apache/kafka/pull/15393#issuecomment-1954244163 @cadonna could you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16194: Do not return records from poll if group metadata unknown [kafka]
lucasbru commented on code in PR #15369: URL: https://github.com/apache/kafka/pull/15369#discussion_r1495811911 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -98,6 +99,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; Review Comment: not used ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -727,6 +732,17 @@ public ConsumerRecords poll(final Duration timeout) { } } +private boolean isGenerationKnown() { Review Comment: How about this one-liner ``` return groupMetadata.filter(g -> g.generationId() != JoinGroupRequest.UNKNOWN_GENERATION_ID).isPresent(); ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -708,14 +709,18 @@ public ConsumerRecords poll(final Duration timeout) { wakeupTrigger.maybeTriggerWakeup(); updateAssignmentMetadataIfNeeded(timer); -final Fetch fetch = pollForFetches(timer); -if (!fetch.isEmpty()) { -if (fetch.records().isEmpty()) { -log.trace("Returning empty records from `poll()` " +if (isGenerationKnown()) { Review Comment: If this check is false... where do we block? It seems like we busy loop with 100% CPU, but maybe I'm wrong ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1467,6 +1483,7 @@ public void unsubscribe() { } catch (TimeoutException e) { log.error("Failed while waiting for the unsubscribe event to complete"); } +groupMetadata = initializeGroupMetadata(groupMetadata.get().groupId(), Optional.empty()); Review Comment: This looks like an independent fix? Maybe mention in the PR description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16284: Fix performance regression in RocksDB [kafka]
lucasbru opened a new pull request, #15393: URL: https://github.com/apache/kafka/pull/15393 A performance regression introduced in commit 5bc3aa428067dff1f2b9075ff5d1351fb05d4b10 reduces the write performance in RocksDB by ~3x. The bug is that we fail to pass the `WriteOptions` that disable the write-ahead log into the DB accessor. For testing, the time to write 10 times 1 Million records into one RocksDB each were measured: * Before 5bc3aa428067dff1f2b9075ff5d1351fb05d4b10: 7954ms, 12933ms * After 5bc3aa428067dff1f2b9075ff5d1351fb05d4b10: 30345ms, 31992ms * After 5bc3aa428067dff1f2b9075ff5d1351fb05d4b10 with this fix: 8040ms, 10563ms * On current trunk with this fix: 9508ms, 10441ms ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16284) Performance regression in RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-16284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16284: -- Assignee: Lucas Brutschy > Performance regression in RocksDB > - > > Key: KAFKA-16284 > URL: https://issues.apache.org/jira/browse/KAFKA-16284 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > In benchmarks, we are noticing a performance regression in the performance of > `RocksDBStore`. > The regression happens between those two commits: > > {code:java} > trunk - 70c8b8d0af - regressed - 2024-01-06T14:00:20Z > trunk - d5aa341a18 - not regressed - 2023-12-31T11:47:14Z > {code} > The regression can be reproduced by the following test: > > {code:java} > package org.apache.kafka.streams.state.internals; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.common.utils.Bytes; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.processor.StateStoreContext; > import org.apache.kafka.test.InternalMockProcessorContext; > import org.apache.kafka.test.MockRocksDbConfigSetter; > import org.apache.kafka.test.StreamsTestUtils; > import org.apache.kafka.test.TestUtils; > import org.junit.Before; > import org.junit.Test; > import java.io.File; > import java.nio.ByteBuffer; > import java.util.Properties; > public class RocksDBStorePerfTest { > InternalMockProcessorContext context; > RocksDBStore rocksDBStore; > final static String DB_NAME = "db-name"; > final static String METRICS_SCOPE = "metrics-scope"; > RocksDBStore getRocksDBStore() { > return new RocksDBStore(DB_NAME, METRICS_SCOPE); > } > @Before > public void setUp() { > final Properties props = StreamsTestUtils.getStreamsConfig(); > props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, > MockRocksDbConfigSetter.class); > File dir = TestUtils.tempDirectory(); > context = new InternalMockProcessorContext<>( > dir, > Serdes.String(), > Serdes.String(), > new StreamsConfig(props) > ); > } > @Test > public void testPerf() { > long start = System.currentTimeMillis(); > for (int i = 0; i < 10; i++) { > System.out.println("Iteration: "+i+" Time: " + > (System.currentTimeMillis() - start)); > RocksDBStore rocksDBStore = getRocksDBStore(); > rocksDBStore.init((StateStoreContext) context, rocksDBStore); > for (int j = 0; j < 100; j++) { > rocksDBStore.put(new > Bytes(ByteBuffer.allocate(4).putInt(j).array()), "perf".getBytes()); > } > rocksDBStore.close(); > } > long end = System.currentTimeMillis(); > System.out.println("Time: " + (end - start)); > } > } > {code} > > I have isolated the regression to commit > [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]. > On my machine, the test takes ~8 seconds before > [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10] > and ~30 seconds after > [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-12744: Breaking change dependency upgrade: "argparse4j" 0.7.0 -->> 0.9.0 [kafka]
dejan2609 commented on PR #10626: URL: https://github.com/apache/kafka/pull/10626#issuecomment-1954154725 Hi @mimaison ! I will return to this once Kafka drops support for Java 8. Reference JIRA ticket: **_KIP-750: Drop support for Java 8 in Kafka 4.0 (deprecate in 3.0)_** https://issues.apache.org/jira/browse/KAFKA-12894 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16285) Make group metadata available when a new assignment is set in async Kafka consumer
[ https://issues.apache.org/jira/browse/KAFKA-16285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-16285: -- Description: Currently, the new async Kafka consumer sends an event from the background thread to the application thread when the group metadata is updated. Group metadata is updated when the background thread receives a new assignment. More specifically, the member epoch is updated each time a new assignment is received and and the member ID is updated with the first assignment. In contrast to the group metadata update, the assignment is directly set in the subscription without sending an update event from the background thread to the application thread. That means that there is a delay between the application thread being aware of the update to the assignment and the application thread being aware of the update to the group metadata. This behavior differs with respect to the legacy consumer were the assignment and the group metadata is updated at the same time. We should make the update to the group metadata available to the application thread when the update to the assignment is made available to the application thread so that assignment an group metadata are in sync. For example, {{producer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata);}} benefits from this improvement because if the offsets to commit are consistent with the current assignment also the group metadata would be. Currently, that is not guaranteed. was: Currently, the new async Kafka consumer sends an event from the background thread to the application thread when the group metadata is updated. Group metadata is updated when the background thread receives a new assignment. More specifically, the member epoch is updated each time a new assignment is received and and the member ID is updated with the first assignment. In contrast to the group metadata update, the assignment is directly set in the subscription without sending an update event from the background thread to the application thread. That means that there is a delay between the application thread being aware of the update to the assignment and the application thread being aware of the update to the group metadata. This behavior differs with respect to the legacy consumer were the assignment and the group metadata is updated at the same time. We should make the update to the group metadata available to the application thread when the update to the assignment is made available to the application thread so that assignment an group metadata are in sync. > Make group metadata available when a new assignment is set in async Kafka > consumer > -- > > Key: KAFKA-16285 > URL: https://issues.apache.org/jira/browse/KAFKA-16285 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Bruno Cadonna >Priority: Major > Labels: kip-848-client-support > > Currently, the new async Kafka consumer sends an event from the background > thread to the application thread when the group metadata is updated. Group > metadata is updated when the background thread receives a new assignment. > More specifically, the member epoch is updated each time a new assignment is > received and and the member ID is updated with the first assignment. > In contrast to the group metadata update, the assignment is directly set in > the subscription without sending an update event from the background thread > to the application thread. That means that there is a delay between the > application thread being aware of the update to the assignment and the > application thread being aware of the update to the group metadata. This > behavior differs with respect to the legacy consumer were the assignment and > the group metadata is updated at the same time. > We should make the update to the group metadata available to the application > thread when the update to the assignment is made available to the application > thread so that assignment an group metadata are in sync. > For example, {{producer.sendOffsetsToTransaction(offsetsToCommit, > groupMetadata);}} benefits from this improvement because if the offsets to > commit are consistent with the current assignment also the group metadata > would be. Currently, that is not guaranteed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16285) Make group metadata available when a new assignment is set in async Kafka consumer
Bruno Cadonna created KAFKA-16285: - Summary: Make group metadata available when a new assignment is set in async Kafka consumer Key: KAFKA-16285 URL: https://issues.apache.org/jira/browse/KAFKA-16285 Project: Kafka Issue Type: Improvement Components: clients, consumer Reporter: Bruno Cadonna Currently, the new async Kafka consumer sends an event from the background thread to the application thread when the group metadata is updated. Group metadata is updated when the background thread receives a new assignment. More specifically, the member epoch is updated each time a new assignment is received and and the member ID is updated with the first assignment. In contrast to the group metadata update, the assignment is directly set in the subscription without sending an update event from the background thread to the application thread. That means that there is a delay between the application thread being aware of the update to the assignment and the application thread being aware of the update to the group metadata. This behavior differs with respect to the legacy consumer were the assignment and the group metadata is updated at the same time. We should make the update to the group metadata available to the application thread when the update to the assignment is made available to the application thread so that assignment an group metadata are in sync. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16283) RoundRobinPartitioner will only send to half of the partitions in a topic
[ https://issues.apache.org/jira/browse/KAFKA-16283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818794#comment-17818794 ] Luke Chen commented on KAFKA-16283: --- [~alivshits], any thoughts on this? > RoundRobinPartitioner will only send to half of the partitions in a topic > - > > Key: KAFKA-16283 > URL: https://issues.apache.org/jira/browse/KAFKA-16283 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Priority: Major > > When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we > expect data are send to all partitions in round-robin manner. But we found > there are only half of the partitions got the data. This causes half of the > resources(storage, consumer...) are wasted. > {code:java} > > bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server > > localhost:9092 --partitions 2 > Created topic quickstart-events4. > # send 10 records to the topic, expecting 5 records in partition0, and 5 > records in partition1 > > bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 > > --record-size 100 --throughput -1 --producer-props > > bootstrap.servers=localhost:9092 > > partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner > 10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, > 132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th. > > ls -al /tmp/kafka-logs/quickstart-events4-0 > total 24 > drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . > drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. > -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index > -rw-r--r-- 1 lukchen wheel 1151 2 20 19:53 .log > -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 > .timeindex > -rw-r--r-- 1 lukchen wheel 8 2 20 19:53 leader-epoch-checkpoint > -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata > # No records in partition 1 > > ls -al /tmp/kafka-logs/quickstart-events4-1 > total 8 > drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . > drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. > -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index > -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 .log > -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 > .timeindex > -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 leader-epoch-checkpoint > -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata > {code} > Had a quick look, it's because we will abortOnNewBatch each time when new > batch created. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16284) Performance regression in RocksDB
Lucas Brutschy created KAFKA-16284: -- Summary: Performance regression in RocksDB Key: KAFKA-16284 URL: https://issues.apache.org/jira/browse/KAFKA-16284 Project: Kafka Issue Type: Task Components: streams Reporter: Lucas Brutschy In benchmarks, we are noticing a performance regression in the performance of `RocksDBStore`. The regression happens between those two commits: {code:java} trunk - 70c8b8d0af - regressed - 2024-01-06T14:00:20Z trunk - d5aa341a18 - not regressed - 2023-12-31T11:47:14Z {code} The regression can be reproduced by the following test: {code:java} package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockRocksDbConfigSetter; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; import java.io.File; import java.nio.ByteBuffer; import java.util.Properties; public class RocksDBStorePerfTest { InternalMockProcessorContext context; RocksDBStore rocksDBStore; final static String DB_NAME = "db-name"; final static String METRICS_SCOPE = "metrics-scope"; RocksDBStore getRocksDBStore() { return new RocksDBStore(DB_NAME, METRICS_SCOPE); } @Before public void setUp() { final Properties props = StreamsTestUtils.getStreamsConfig(); props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class); File dir = TestUtils.tempDirectory(); context = new InternalMockProcessorContext<>( dir, Serdes.String(), Serdes.String(), new StreamsConfig(props) ); } @Test public void testPerf() { long start = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { System.out.println("Iteration: "+i+" Time: " + (System.currentTimeMillis() - start)); RocksDBStore rocksDBStore = getRocksDBStore(); rocksDBStore.init((StateStoreContext) context, rocksDBStore); for (int j = 0; j < 100; j++) { rocksDBStore.put(new Bytes(ByteBuffer.allocate(4).putInt(j).array()), "perf".getBytes()); } rocksDBStore.close(); } long end = System.currentTimeMillis(); System.out.println("Time: " + (end - start)); } } {code} I have isolated the regression to commit [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]. On my machine, the test takes ~8 seconds before [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10] and ~30 seconds after [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [WIP] KAFKA-13566: producer exponential backoff implementation for KIP-580 [kafka]
mimaison commented on PR #11646: URL: https://github.com/apache/kafka/pull/11646#issuecomment-1954090264 Done in https://github.com/apache/kafka/pull/14111, closing this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] KAFKA-13566: producer exponential backoff implementation for KIP-580 [kafka]
mimaison closed pull request #11646: [WIP] KAFKA-13566: producer exponential backoff implementation for KIP-580 URL: https://github.com/apache/kafka/pull/11646 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] KAFKA-13567: adminClient exponential backoff implementation for KIP-580 [kafka]
mimaison closed pull request #11647: [WIP] KAFKA-13567: adminClient exponential backoff implementation for KIP-580 URL: https://github.com/apache/kafka/pull/11647 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] KAFKA-13567: adminClient exponential backoff implementation for KIP-580 [kafka]
mimaison commented on PR #11647: URL: https://github.com/apache/kafka/pull/11647#issuecomment-1954090079 Done in https://github.com/apache/kafka/pull/14111, closing this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add kafka topic sync offset lag metrics by JMX [kafka]
mimaison commented on PR #11387: URL: https://github.com/apache/kafka/pull/11387#issuecomment-1954086721 This is being done by KIP-971, so closing this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add kafka topic sync offset lag metrics by JMX [kafka]
mimaison closed pull request #11387: Add kafka topic sync offset lag metrics by JMX URL: https://github.com/apache/kafka/pull/11387 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
cadonna commented on code in PR #15188: URL: https://github.com/apache/kafka/pull/15188#discussion_r1495579840 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1730,6 +1744,21 @@ private void subscribeInternal(Pattern pattern, Optional listener) { +acquireAndEnsureOpen(); +try { +maybeThrowInvalidGroupIdException(); +if (pattern == null || pattern.pattern().isEmpty()) Review Comment: I think we should move the check if the pattern is non-empty (and maybe some other validity checks) to class `SubscriptionPattern`. It should not be possible to create an instance of `SubscriptionPattern` with an empty pattern. ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -754,6 +754,51 @@ public void subscribe(Pattern pattern) { delegate.subscribe(pattern); } +/** + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done periodically against all topics existing at the time of check. + * This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering + * the max metadata age, the consumer will refresh metadata more often and check for matching topics. Review Comment: I think this is not correct with the new pattern subscription since the assignment is done broker-side. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java: ## @@ -84,6 +85,9 @@ private enum SubscriptionType { /* the pattern user has requested */ private Pattern subscribedPattern; +/* RE2J compatible regex */ +private SubscriptionPattern subscriptionPattern; Review Comment: What about having a class that contains either a `SubscriptionPattern` or a `Pattern`? Something like : ```java public class JavaPatternOrSubscriptionPattern { private final Pattern javaPattern; private final SubscriptionPattern subscriptionPattern; private JavaPatternOrSubscriptionPattern(final Pattern pattern) { this.pattern = pattern; } private JavaPatternOrSubscriptionPattern(final SubscriptionPattern pattern) { this.subscriptionPattern = pattern; } public static JavaPatternOrSubscriptionPattern javaPattern(final Pattern pattern) { return new JavaPatternOrSubscriptionPattern(pattern); } public static JavaPatternOrSubscriptionPattern subscriptionPattern(final SubscriptionPattern pattern) { return new JavaPatternOrSubscriptionPattern(pattern); } public String pattern() { return subscriptionPattern != null ? subscriptionPattern.pattern() : javaPattern.pattern(); } public String toString() { return "pattern = " + pattern(); } ... } ``` In such a way we would encapsulate the code that ensures that there is only of both set. ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -754,6 +754,51 @@ public void subscribe(Pattern pattern) { delegate.subscribe(pattern); } +/** + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done periodically against all topics existing at the time of check. + * This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering + * the max metadata age, the consumer will refresh metadata more often and check for matching topics. + * + * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the + * use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there + * is a change to the topics matching the provided pattern and when consumer group membership changes. + * Group rebalances only take place during an active call to {@link #poll(Duration)}. Review Comment: This should also not correct anymore with KP-848 which introduces this method. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java: ## @@ -495,6 +496,16 @@ public void subscribe(Pattern pattern) { subscribeInternal(pattern, Optional.empty()); } +@Override +public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { +log.warn("Operation not supported in new consumer group protocol"); +} + +@Override +public void subscribe(SubscriptionPattern pattern) { +log.warn("Operation not supported in new consumer group protocol"); Review Comment: See above ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java: ##
Re: [PR] KAFKA-12891: KIP-749 Add --files and --file-separator options to the ConsoleProducer (WIP) [kafka]
mimaison closed pull request #10889: KAFKA-12891: KIP-749 Add --files and --file-separator options to the ConsoleProducer (WIP) URL: https://github.com/apache/kafka/pull/10889 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12891: KIP-749 Add --files and --file-separator options to the ConsoleProducer (WIP) [kafka]
mimaison commented on PR #10889: URL: https://github.com/apache/kafka/pull/10889#issuecomment-1954080126 KIP-749 was never voted and is now abandoned so I'll close this PR. Feel free to restart the discussion on the KIP if it's still something you want to do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16283) RoundRobinPartitioner will only send to half of the partitions in a topic
[ https://issues.apache.org/jira/browse/KAFKA-16283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-16283: -- Description: When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we expect data are send to all partitions in round-robin manner. But we found there are only half of the partitions got the data. This causes half of the resources(storage, consumer...) are wasted. {code:java} > bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server > localhost:9092 --partitions 2 Created topic quickstart-events4. # send 10 records to the topic, expecting 5 records in partition0, and 5 records in partition1 > bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 > --record-size 100 --throughput -1 --producer-props > bootstrap.servers=localhost:9092 > partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner 10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, 132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th. > ls -al /tmp/kafka-logs/quickstart-events4-0 total 24 drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index -rw-r--r-- 1 lukchen wheel 1151 2 20 19:53 .log -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 .timeindex -rw-r--r-- 1 lukchen wheel 8 2 20 19:53 leader-epoch-checkpoint -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata # No records in partition 1 > ls -al /tmp/kafka-logs/quickstart-events4-1 total 8 drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 .log -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 .timeindex -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 leader-epoch-checkpoint -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata {code} Had a quick look, it's because we will abortOnNewBatch each time when new batch created. was: When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we expect data are send to all partitions in round-robin manner. But we found there are only half of the partitions got the data. This causes half of the resources(storage, consumer...) are wasted. {code:java} > bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server > localhost:9092 --partitions 2 Created topic quickstart-events4. # send 10 records to the topic, expecting 5 records in partition0, and 5 records in partition1 > bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 > --record-size 100 --throughput -1 --producer-props > bootstrap.servers=localhost:9092 > partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner 10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, 132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th. > ls -al /tmp/kafka-logs/quickstart-events4-0 total 24 drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index -rw-r--r-- 1 lukchen wheel 1151 2 20 19:53 .log -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 .timeindex -rw-r--r-- 1 lukchen wheel 8 2 20 19:53 leader-epoch-checkpoint -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata # No records in partition 1 > ls -al /tmp/kafka-logs/quickstart-events4-1 total 8 drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 .log -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 .timeindex -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 leader-epoch-checkpoint -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata {code} > RoundRobinPartitioner will only send to half of the partitions in a topic > - > > Key: KAFKA-16283 > URL: https://issues.apache.org/jira/browse/KAFKA-16283 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Priority: Major > > When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we > expect data are send to all partitions in round-robin manner. But we found > there are only hal
Re: [PR] KAFKA-12744: Breaking change dependency upgrade: "argparse4j" 0.7.0 -->> 0.9.0 [kafka]
mimaison commented on PR #10626: URL: https://github.com/apache/kafka/pull/10626#issuecomment-1954070991 We can't upgrade to argparse 0.9.0 now as it requires Java 9. Closing this PR for now, we can revisit it once Kafka drops support for Java 8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16283) RoundRobinPartitioner will only send to half of the partitions in a topic
[ https://issues.apache.org/jira/browse/KAFKA-16283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-16283: -- Description: When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we expect data are send to all partitions in round-robin manner. But we found there are only half of the partitions got the data. This causes half of the resources(storage, consumer...) are wasted. {code:java} > bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server > localhost:9092 --partitions 2 Created topic quickstart-events4. # send 10 records to the topic, expecting 5 records in partition0, and 5 records in partition1 > bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 > --record-size 100 --throughput -1 --producer-props > bootstrap.servers=localhost:9092 > partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner 10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, 132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th. > ls -al /tmp/kafka-logs/quickstart-events4-0 total 24 drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index -rw-r--r-- 1 lukchen wheel 1151 2 20 19:53 .log -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 .timeindex -rw-r--r-- 1 lukchen wheel 8 2 20 19:53 leader-epoch-checkpoint -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata # No records in partition 1 > ls -al /tmp/kafka-logs/quickstart-events4-1 total 8 drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 .log -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 .timeindex -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 leader-epoch-checkpoint -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata {code} was: When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we expect data are send to all partitions in round-robin manner. But we found there are only half of the partitions got the data. This causes half of the resources(storage, consumer...) are wasted. {code:java} > bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server > localhost:9092 --partitions 2 Created topic quickstart-events4. > bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 > --record-size 100 --throughput -1 --producer-props > bootstrap.servers=localhost:9092 > partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner 10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, 132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th. > ls -al /tmp/kafka-logs/quickstart-events4-0 total 24 drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index -rw-r--r-- 1 lukchen wheel 1151 2 20 19:53 .log -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 .timeindex -rw-r--r-- 1 lukchen wheel 8 2 20 19:53 leader-epoch-checkpoint -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata > ls -al /tmp/kafka-logs/quickstart-events4-1 total 8 drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 .log -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 .timeindex -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 leader-epoch-checkpoint -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata {code} > RoundRobinPartitioner will only send to half of the partitions in a topic > - > > Key: KAFKA-16283 > URL: https://issues.apache.org/jira/browse/KAFKA-16283 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Priority: Major > > When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we > expect data are send to all partitions in round-robin manner. But we found > there are only half of the partitions got the data. This causes half of the > resources(storage, consumer...) are wasted. > {code:java} > > bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server > > localhost:90
Re: [PR] KAFKA-12744: Breaking change dependency upgrade: "argparse4j" 0.7.0 -->> 0.9.0 [kafka]
mimaison closed pull request #10626: KAFKA-12744: Breaking change dependency upgrade: "argparse4j" 0.7.0 -->> 0.9.0 URL: https://github.com/apache/kafka/pull/10626 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16283) RoundRobinPartitioner will only send to half of the partitions in a topic
Luke Chen created KAFKA-16283: - Summary: RoundRobinPartitioner will only send to half of the partitions in a topic Key: KAFKA-16283 URL: https://issues.apache.org/jira/browse/KAFKA-16283 Project: Kafka Issue Type: Bug Reporter: Luke Chen When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we expect data are send to all partitions in round-robin manner. But we found there are only half of the partitions got the data. This causes half of the resources(storage, consumer...) are wasted. {code:java} bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server localhost:9092 --partitions 2 Created topic quickstart-events4. bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 --record-size 100 --throughput -1 --producer-props bootstrap.servers=localhost:9092 partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner 10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, 132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th. lukchen@lukchen-mac kafka % ls -al /tmp/kafka-logs/quickstart-events4-0 total 24 drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index -rw-r--r-- 1 lukchen wheel 1151 2 20 19:53 .log -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 .timeindex -rw-r--r-- 1 lukchen wheel 8 2 20 19:53 leader-epoch-checkpoint -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata lukchen@lukchen-mac kafka % ls -al /tmp/kafka-logs/quickstart-events4-1 total 8 drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 .log -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 .timeindex -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 leader-epoch-checkpoint -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16283) RoundRobinPartitioner will only send to half of the partitions in a topic
[ https://issues.apache.org/jira/browse/KAFKA-16283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-16283: -- Description: When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we expect data are send to all partitions in round-robin manner. But we found there are only half of the partitions got the data. This causes half of the resources(storage, consumer...) are wasted. {code:java} > bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server > localhost:9092 --partitions 2 Created topic quickstart-events4. > bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 > --record-size 100 --throughput -1 --producer-props > bootstrap.servers=localhost:9092 > partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner 10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, 132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th. > ls -al /tmp/kafka-logs/quickstart-events4-0 total 24 drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index -rw-r--r-- 1 lukchen wheel 1151 2 20 19:53 .log -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 .timeindex -rw-r--r-- 1 lukchen wheel 8 2 20 19:53 leader-epoch-checkpoint -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata > ls -al /tmp/kafka-logs/quickstart-events4-1 total 8 drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 .log -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 .timeindex -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 leader-epoch-checkpoint -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata {code} was: When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we expect data are send to all partitions in round-robin manner. But we found there are only half of the partitions got the data. This causes half of the resources(storage, consumer...) are wasted. {code:java} bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server localhost:9092 --partitions 2 Created topic quickstart-events4. bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 10 --record-size 100 --throughput -1 --producer-props bootstrap.servers=localhost:9092 partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner 10 records sent, 72.463768 records/sec (0.01 MB/sec), 35.10 ms avg latency, 132.00 ms max latency, 24 ms 50th, 132 ms 95th, 132 ms 99th, 132 ms 99.9th. lukchen@lukchen-mac kafka % ls -al /tmp/kafka-logs/quickstart-events4-0 total 24 drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index -rw-r--r-- 1 lukchen wheel 1151 2 20 19:53 .log -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 .timeindex -rw-r--r-- 1 lukchen wheel 8 2 20 19:53 leader-epoch-checkpoint -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata lukchen@lukchen-mac kafka % ls -al /tmp/kafka-logs/quickstart-events4-1 total 8 drwxr-xr-x 7 lukchen wheel 224 2 20 19:53 . drwxr-xr-x 70 lukchen wheel 2240 2 20 19:53 .. -rw-r--r-- 1 lukchen wheel 10485760 2 20 19:53 .index -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 .log -rw-r--r-- 1 lukchen wheel 10485756 2 20 19:53 .timeindex -rw-r--r-- 1 lukchen wheel 0 2 20 19:53 leader-epoch-checkpoint -rw-r--r-- 1 lukchen wheel43 2 20 19:53 partition.metadata {code} > RoundRobinPartitioner will only send to half of the partitions in a topic > - > > Key: KAFKA-16283 > URL: https://issues.apache.org/jira/browse/KAFKA-16283 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Priority: Major > > When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we > expect data are send to all partitions in round-robin manner. But we found > there are only half of the partitions got the data. This causes half of the > resources(storage, consumer...) are wasted. > {code:java} > > bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server > > localhost:9092 --partitions 2 > Created topic quickstart-events4. > > bin/kafka-producer-perf-te
Re: [PR] [KAFKA-10718][Kafka Connect]add config settting, skip record when enc… [kafka]
mimaison commented on PR #9592: URL: https://github.com/apache/kafka/pull/9592#issuecomment-1954057264 This proposed a public API change so this requires a KIP: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals Considering the age of this PR, I'll close it for now. Feel free to open a KIP if you still want this feature. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [KAFKA-10718][Kafka Connect]add config settting, skip record when enc… [kafka]
mimaison closed pull request #9592: [KAFKA-10718][Kafka Connect]add config settting, skip record when enc… URL: https://github.com/apache/kafka/pull/9592 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10325: KIP-649 implementation [kafka]
mimaison commented on PR #9101: URL: https://github.com/apache/kafka/pull/9101#issuecomment-1954051810 KIP-649 was never voted and is now abandoned so I'll close this PR. Feel free to restart the discussion on the KIP if it's still something you want to do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10325: KIP-649 implementation [kafka]
mimaison closed pull request #9101: KAFKA-10325: KIP-649 implementation URL: https://github.com/apache/kafka/pull/9101 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10325: KIP-649 implementation [kafka]
mimaison commented on PR #9101: URL: https://github.com/apache/kafka/pull/9101#issuecomment-1954051581 KIP-649 was never voted and is now abandoned so I'll close this PR. Feel free to restart the discussion on the KIP if it's still something you want to do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10369 [WIP] KIP-655 implementation [kafka]
mimaison commented on PR #9210: URL: https://github.com/apache/kafka/pull/9210#issuecomment-1954049633 KIP-655 was never voted and is now abandoned so I'll close this PR. Feel free to restart the discussion on the KIP if it's still something you want to do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10369 [WIP] KIP-655 implementation [kafka]
mimaison closed pull request #9210: KAFKA-10369 [WIP] KIP-655 implementation URL: https://github.com/apache/kafka/pull/9210 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10281: [WIP] Add log compression analysis tool KIP-640 [kafka]
mimaison commented on PR #9193: URL: https://github.com/apache/kafka/pull/9193#issuecomment-1954047088 Considering KIP-640 was never voted and is now abandoned, I'll close this PR. Feel free to restart the discussion on the KIP if it's still something you want to do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10281: [WIP] Add log compression analysis tool KIP-640 [kafka]
mimaison closed pull request #9193: KAFKA-10281: [WIP] Add log compression analysis tool KIP-640 URL: https://github.com/apache/kafka/pull/9193 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10116: GraalVM native-image prototype [kafka]
mimaison commented on PR #8830: URL: https://github.com/apache/kafka/pull/8830#issuecomment-1954044169 This is now being done via KIP-974. Considering the age of this PR, I'll close it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10116: GraalVM native-image prototype [kafka]
mimaison closed pull request #8830: KAFKA-10116: GraalVM native-image prototype URL: https://github.com/apache/kafka/pull/8830 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16281) Possible IllegalState with KIP-996
[ https://issues.apache.org/jira/browse/KAFKA-16281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jack Vanlightly updated KAFKA-16281: Description: I have a TLA+ model of KIP-996 (pre-vote) and I have identified an IllegalState exception that would occur with the existing MaybeHandleCommonResponse behavior. The issue stems from the fact that a leader, let's call it r1, can resign (either due to a restart or check quorum) and then later initiate a pre-vote where it ends up in the same epoch as before. When r1 receives a response from r2 who believes that r1 is still the leader, the logic in MaybeHandleCommonResponse tries to transition r1 to follower of itself, causing an IllegalState exception to be raised. This is an example history: # r1 is the leader in epoch 1. # r1 quorum resigns, or restarts and resigns. # r1 experiences an election timeout and transitions to Prospective. # r1 sends a pre vote request to its peers. # r2 thinks r1 is still the leader, sends a vote response, not granting its vote and setting leaderId=r1 and epoch=1. # r1 receives the vote response and executes MaybeHandleCommonResponse which tries to transition r1 to Follower of itself and an illegal state occurs. The relevant else if statement in MaybeHandleCommonResponse is here: [https://github.com/apache/kafka/blob/a26a1d847f1884a519561e7a4fb4cd13e051c824/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1538] In the TLA+ specification, I fixed this issue by adding a fourth condition to this statement, that replica must not be in the Prospective state. [https://github.com/Vanlightly/kafka-tlaplus/blob/9b2600d1cd5c65930d666b12792d47362b64c015/kraft/kip_996/kraft_kip_996_functions.tla#L336|https://github.com/Vanlightly/kafka-tlaplus/blob/421f170ba4bd8c5eceb36b88b47901ee3d9c3d2a/kraft/kip_996/kraft_kip_996_functions.tla#L336] Note, that I also had to implement the sending of the BeginQuorumEpoch request by the leader to prevent a replica getting stuck in Prospective. If the replica r2 has an election timeout but due to a transient connectivity issue with the leader, but has also fallen behind slightly, then r2 will remain stuck as a Prospective because none of its peers, who have connectivity to the leader, will grant it a pre-vote. To enable r2 to become a functional member again, the leader must give it a nudge with a BeginQuorumEpoch request. The alternative (which I have also modeled) is for a Prospective to transition to Follower when it receives a negative pre-vote response with a non-null leaderId. This comes with a separate liveness issue which I can discuss if this "transition to Follower" approach is interesting. Either way, a stuck Prospective needs a way to transition to follower eventually, if all other members have a stable leader. was: I have a TLA+ model of KIP-966 and I have identified an IllegalState exception that would occur with the existing MaybeHandleCommonResponse behavior. The issue stems from the fact that a leader, let's call it r1, can resign (either due to a restart or check quorum) and then later initiate a pre-vote where it ends up in the same epoch as before, but a cleared local leader id. When r1 transitions to Prospective it clears its local leader id. When r1 receives a response from r2 who believes that r1 is still the leader, the logic in MaybeHandleCommonResponse tries to transition r1 to follower of itself, causing an IllegalState exception to be raised. This is an example history: # r1 is the leader in epoch 1. # r1 quorum resigns, or restarts and resigns. # r1 experiences an election timeout and transitions to Prospective clearing its local leader id. # r1 sends a pre vote request to its peers. # r2 thinks r1 is still the leader, sends a vote response, not granting its vote and setting leaderId=r1 and epoch=1. # r1 receives the vote response and executes MaybeHandleCommonResponse which tries to transition r1 to Follower of itself and an illegal state occurs. The relevant else if statement in MaybeHandleCommonResponse is here: https://github.com/apache/kafka/blob/a26a1d847f1884a519561e7a4fb4cd13e051c824/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1538 In the TLA+ specification, I fixed this issue by adding a fourth condition to this statement, that the leaderId also does not equal this server's id. [https://github.com/Vanlightly/kafka-tlaplus/blob/9b2600d1cd5c65930d666b12792d47362b64c015/kraft/kip_996/kraft_kip_996_functions.tla#L336] We should probably create a test to confirm the issue first and then look at using the fix I made in the TLA+, though there may be other options. > Possible IllegalState with KIP-996 > -- > > Key: KAFKA-16281 > URL: https://issues.apache.org/jira/browse/KAFKA-16281 > Project: Kafka > Issue Type: Task >
Re: [PR] KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation [kafka]
mimaison closed pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation URL: https://github.com/apache/kafka/pull/8846 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation [kafka]
mimaison commented on PR #8846: URL: https://github.com/apache/kafka/pull/8846#issuecomment-1954041148 Done in https://github.com/apache/kafka/pull/14111, closing this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] java 8 code level update [kafka]
mimaison closed pull request #6454: java 8 code level update URL: https://github.com/apache/kafka/pull/6454 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] java 8 code level update [kafka]
mimaison commented on PR #6454: URL: https://github.com/apache/kafka/pull/6454#issuecomment-1954032896 It looks like most of these changes have been since the PR was originally opened. Considering its age I'm going to close it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-228 Negative record timestamp support [kafka]
mimaison commented on PR #5072: URL: https://github.com/apache/kafka/pull/5072#issuecomment-1954026617 The KIP was never voted and is currently abandoned. So I'll close this PR for now, feel free to restart the discussion on the KIP if it's still something you want. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-228 Negative record timestamp support [kafka]
mimaison closed pull request #5072: KIP-228 Negative record timestamp support URL: https://github.com/apache/kafka/pull/5072 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16281) Possible IllegalState with KIP-996
[ https://issues.apache.org/jira/browse/KAFKA-16281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jack Vanlightly updated KAFKA-16281: Summary: Possible IllegalState with KIP-996 (was: Probable IllegalState possible with KIP-966) > Possible IllegalState with KIP-996 > -- > > Key: KAFKA-16281 > URL: https://issues.apache.org/jira/browse/KAFKA-16281 > Project: Kafka > Issue Type: Task > Components: kraft >Reporter: Jack Vanlightly >Priority: Major > > I have a TLA+ model of KIP-966 and I have identified an IllegalState > exception that would occur with the existing MaybeHandleCommonResponse > behavior. > The issue stems from the fact that a leader, let's call it r1, can resign > (either due to a restart or check quorum) and then later initiate a pre-vote > where it ends up in the same epoch as before, but a cleared local leader id. > When r1 transitions to Prospective it clears its local leader id. When r1 > receives a response from r2 who believes that r1 is still the leader, the > logic in MaybeHandleCommonResponse tries to transition r1 to follower of > itself, causing an IllegalState exception to be raised. > This is an example history: > # r1 is the leader in epoch 1. > # r1 quorum resigns, or restarts and resigns. > # r1 experiences an election timeout and transitions to Prospective clearing > its local leader id. > # r1 sends a pre vote request to its peers. > # r2 thinks r1 is still the leader, sends a vote response, not granting its > vote and setting leaderId=r1 and epoch=1. > # r1 receives the vote response and executes MaybeHandleCommonResponse which > tries to transition r1 to Follower of itself and an illegal state occurs. > The relevant else if statement in MaybeHandleCommonResponse is here: > https://github.com/apache/kafka/blob/a26a1d847f1884a519561e7a4fb4cd13e051c824/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1538 > In the TLA+ specification, I fixed this issue by adding a fourth condition to > this statement, that the leaderId also does not equal this server's id. > [https://github.com/Vanlightly/kafka-tlaplus/blob/9b2600d1cd5c65930d666b12792d47362b64c015/kraft/kip_996/kraft_kip_996_functions.tla#L336] > We should probably create a test to confirm the issue first and then look at > using the fix I made in the TLA+, though there may be other options. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-2111: Add help arguments and required fields [kafka]
mimaison commented on PR #3605: URL: https://github.com/apache/kafka/pull/3605#issuecomment-1954006952 This PR is very old and the tools are being rewritten in Java, so I'll close this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-2111: Add help arguments and required fields [kafka]
mimaison closed pull request #3605: KAFKA-2111: Add help arguments and required fields URL: https://github.com/apache/kafka/pull/3605 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update KafkaLog4jAppender.java adding SaslMechanism as an option [kafka]
mimaison commented on PR #3476: URL: https://github.com/apache/kafka/pull/3476#issuecomment-1954004851 Considering the age of this PR and that KIP-719 to deprecate the log4j appender has been voted, I'll close this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update KafkaLog4jAppender.java adding SaslMechanism as an option [kafka]
mimaison closed pull request #3476: Update KafkaLog4jAppender.java adding SaslMechanism as an option URL: https://github.com/apache/kafka/pull/3476 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-3663 : Implements KIP-59 for a kafka-brokers.sh command [kafka]
mimaison commented on PR #1539: URL: https://github.com/apache/kafka/pull/1539#issuecomment-1953997528 KIP-59 has never been voted and is abandoned so closing this PR. Feel free to restart the KIP discussion if it's still something you want to do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-3663 : Implements KIP-59 for a kafka-brokers.sh command [kafka]
mimaison closed pull request #1539: KAFKA-3663 : Implements KIP-59 for a kafka-brokers.sh command URL: https://github.com/apache/kafka/pull/1539 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh
[ https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-16282: -- Description: Currently, when using `kafka-get-offsets.sh` to get the offset by time, we have these choices: {code:java} --time / timestamp of the offsets before that. -1 or latest / [Note: No offset is returned, if the -2 or earliest / timestamp greater than recently -3 or max-timestamp /committed record timestamp is -4 or earliest-local / given.] (default: latest) -5 or latest-tiered {code} For the "latest" option, it'll always return the "high watermark" because we always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It would be good if the command can support to get the last stable offset (LSO) for transaction support. That is, sending the option with *IsolationLevel.READ_COMMITTED* was: Currently, when using `kafka-get-offsets.sh` to get the offset by time, we have these choices: {code:java} --time / timestamp of the offsets before that. -1 or latest / [Note: No offset is returned, if the -2 or earliest / timestamp greater than recently -3 or max-timestamp /committed record timestamp is -4 or earliest-local / given.] (default: latest) -5 or latest-tiered {code} For the latest choice, it'll always return the "high watermark" because we always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It would be good if the command can support to get the last stable offset (LSO) for transaction support. That is, sending the option with *IsolationLevel.READ_COMMITTED* > Allow to get last stable offset (LSO) in kafka-get-offsets.sh > - > > Key: KAFKA-16282 > URL: https://issues.apache.org/jira/browse/KAFKA-16282 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Ahmed Sobeh >Priority: Major > Labels: need-kip, newbie, newbie++ > > Currently, when using `kafka-get-offsets.sh` to get the offset by time, we > have these choices: > {code:java} > --time / timestamp of the offsets before > that. > -1 or latest / [Note: No offset is returned, if > the > -2 or earliest / timestamp greater than recently > -3 or max-timestamp /committed record timestamp is > -4 or earliest-local / given.] (default: latest) > -5 or latest-tiered > {code} > For the "latest" option, it'll always return the "high watermark" because we > always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It > would be good if the command can support to get the last stable offset (LSO) > for transaction support. That is, sending the option with > *IsolationLevel.READ_COMMITTED* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh
[ https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818759#comment-17818759 ] Luke Chen commented on KAFKA-16282: --- Great! Thanks [~ahmedsobeh]! Let me know if you need any help. > Allow to get last stable offset (LSO) in kafka-get-offsets.sh > - > > Key: KAFKA-16282 > URL: https://issues.apache.org/jira/browse/KAFKA-16282 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Ahmed Sobeh >Priority: Major > Labels: need-kip, newbie, newbie++ > > Currently, when using `kafka-get-offsets.sh` to get the offset by time, we > have these choices: > {code:java} > --time / timestamp of the offsets before > that. > -1 or latest / [Note: No offset is returned, if > the > -2 or earliest / timestamp greater than recently > -3 or max-timestamp /committed record timestamp is > -4 or earliest-local / given.] (default: latest) > -5 or latest-tiered > {code} > For the latest choice, it'll always return the "high watermark" because we > always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It > would be good if the command can support to get the last stable offset (LSO) > for transaction support. That is, sending the option with > *IsolationLevel.READ_COMMITTED* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818755#comment-17818755 ] Ondrej Cervinka commented on KAFKA-15302: - Hello, Deleting from a state store while iterating it with KeyValueStore#all or KeyValue#range is a pattern which I sometimes use too. Actually, it may be quite common approach for this forward-delete scenario. In case there won't be a fix, updating documentation would be helpful. Are these all valid workarounds? Note that some are quite limiting. 1. Flush before iterate 2. Delete (modify) after iteration is finished (store keys for deletion is temporary set) 3. Use in-memory stores instead of Rocks DB 4. Disable Streams (heap) cache (or set statestore.cache.max.bytes to 0) > Stale value returned when using store.all() in punctuation function. > > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue entry = kvList.next(); > final Record msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting instance +1 > forwardAll Start > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 > # log file > ... > 01:05:00.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 > 01:05:00.388 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():7873 entries:7873 > 01:05:00.434 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > stream-task [0_0] Flushed cache or buffer Counts > ... > 01:05:00.587 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator > all() > 01:05:00.588 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all > 01:05:00.590 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.ThreadCache -- stream-thread > [95108c48-7c69-4
Re: [PR] MINIOR: Remove accidentally logs [kafka]
mimaison merged PR #15371: URL: https://github.com/apache/kafka/pull/15371 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
Phuc-Hong-Tran commented on PR #15188: URL: https://github.com/apache/kafka/pull/15188#issuecomment-1953901380 @cadonna I'll see if there is a way to go around with not using the Google library to check regex validity (finger-crossed!) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
cadonna commented on PR #15188: URL: https://github.com/apache/kafka/pull/15188#issuecomment-1953893744 > > > I was wondering whether we should introduce a new error code to signal to the user that the regular expression is invalid. Otherwise, we would have to use the invalid request exception and it does not seem user friendly. @cadonna @lianetm What do you think about this? > > > > > > @dajac Is it also possible to verify the validity of the regular expression client-side? > > If we assume that the clients send valid regular expressions to the brokers, I think it would be OK to return an invalid request exception and log the error broker-side. Sending invalid regular expressions should than just be a mistake that happens during development of the clients and not something that happens during usage of the clients. > > The benefit would be to find the mistakes in regular expressions without a request to the brokers. > > The downside of it is that we need some way to validate the regular expressions client-side like the corresponding Google library in Java and I do not know what dependency are needed for clients in other languages. > > I don't think we can include the Google library in the client code. I saw the comment about it on the pull request for the implementation of the regex logic on the broker. Will find it again and quote it here. > > Edit: here is the comment [#14327 (review)](https://github.com/apache/kafka/pull/14327#pullrequestreview-1622366023) Ah, I see! Thanks for the link! Nevertheless, there it says the resolution of the regular expression regarding the existing topics on the brokers. It does not say anything about checking the general validity of the regular expression. I definitely agree that finding topics that match the regular expression is something that needs to be strictly done on the brokers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh
[ https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818729#comment-17818729 ] Ahmed Sobeh edited comment on KAFKA-16282 at 2/20/24 10:14 AM: --- Hi [~showuon]! I will be picking this up. I will start on the KIP doc and post it here once it's ready was (Author: JIRAUSER295920): Hi [~showuon]! I will be picking this up. I will start on the KIP docs and post it here once it's ready > Allow to get last stable offset (LSO) in kafka-get-offsets.sh > - > > Key: KAFKA-16282 > URL: https://issues.apache.org/jira/browse/KAFKA-16282 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Ahmed Sobeh >Priority: Major > Labels: need-kip, newbie, newbie++ > > Currently, when using `kafka-get-offsets.sh` to get the offset by time, we > have these choices: > {code:java} > --time / timestamp of the offsets before > that. > -1 or latest / [Note: No offset is returned, if > the > -2 or earliest / timestamp greater than recently > -3 or max-timestamp /committed record timestamp is > -4 or earliest-local / given.] (default: latest) > -5 or latest-tiered > {code} > For the latest choice, it'll always return the "high watermark" because we > always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It > would be good if the command can support to get the last stable offset (LSO) > for transaction support. That is, sending the option with > *IsolationLevel.READ_COMMITTED* -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
Phuc-Hong-Tran commented on PR #15188: URL: https://github.com/apache/kafka/pull/15188#issuecomment-1953879987 > > I was wondering whether we should introduce a new error code to signal to the user that the regular expression is invalid. Otherwise, we would have to use the invalid request exception and it does not seem user friendly. @cadonna @lianetm What do you think about this? > > > > @dajac Is it also possible to verify the validity of the regular expression client-side? > > If we assume that the clients send valid regular expressions to the brokers, I think it would be OK to return an invalid request exception and log the error broker-side. Sending invalid regular expressions should than just be a mistake that happens during development of the clients and not something that happens during usage of the clients. > > > > The benefit would be to find the mistakes in regular expressions without a request to the brokers. > > > > The downside of it is that we need some way to validate the regular expressions client-side like the corresponding Google library in Java and I do not know what dependency are needed for clients in other languages. > > I don't think we can include the Google library in the client code. I saw the comment about it on the pull request for the implementation of the regex logic on the broker. Will find it again and quote it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
cadonna commented on PR #15188: URL: https://github.com/apache/kafka/pull/15188#issuecomment-1953872828 > I was wondering whether we should introduce a new error code to signal to the user that the regular expression is invalid. Otherwise, we would have to use the invalid request exception and it does not seem user friendly. @cadonna @lianetm What do you think about this? @dajac Is it also possible to verify the validity of the regular expression client-side? If we assume that the clients send valid regular expressions to the brokers, I think it would be OK to return an invalid request exception and log the error broker-side. Sending invalid regular expressions should than just be a mistake that happens during development of the clients and not something that happens during usage of the clients. The benefit would be to find the mistakes in regular expressions without a request to the brokers. The downside of it is that we need some way to validate the regular expressions client-side like the corresponding Google library in Java and I do not know what dependency are needed for clients in other languages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh
[ https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818729#comment-17818729 ] Ahmed Sobeh commented on KAFKA-16282: - Hi [~showuon]! I will be picking this up. I will start on the KIP docs and post it here once it's ready > Allow to get last stable offset (LSO) in kafka-get-offsets.sh > - > > Key: KAFKA-16282 > URL: https://issues.apache.org/jira/browse/KAFKA-16282 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Ahmed Sobeh >Priority: Major > Labels: need-kip, newbie, newbie++ > > Currently, when using `kafka-get-offsets.sh` to get the offset by time, we > have these choices: > {code:java} > --time / timestamp of the offsets before > that. > -1 or latest / [Note: No offset is returned, if > the > -2 or earliest / timestamp greater than recently > -3 or max-timestamp /committed record timestamp is > -4 or earliest-local / given.] (default: latest) > -5 or latest-tiered > {code} > For the latest choice, it'll always return the "high watermark" because we > always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It > would be good if the command can support to get the last stable offset (LSO) > for transaction support. That is, sending the option with > *IsolationLevel.READ_COMMITTED* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16243) Idle kafka-console-consumer with new consumer group protocol preemptively leaves group
[ https://issues.apache.org/jira/browse/KAFKA-16243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16243: --- Fix Version/s: 3.8.0 > Idle kafka-console-consumer with new consumer group protocol preemptively > leaves group > -- > > Key: KAFKA-16243 > URL: https://issues.apache.org/jira/browse/KAFKA-16243 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Lucas Brutschy >Priority: Critical > Fix For: 3.8.0 > > > Using the new consumer group protocol with kafka-console-consumer.sh, I find > that if I leave the consumer with no records to process for 5 minutes > (max.poll.interval.ms = 30ms), the tool logs the following warning > message and leaves the group. > "consumer poll timeout has expired. This means the time between subsequent > calls to poll() was longer than the configured max.poll.interval.ms, which > typically implies that the poll loop is spending too much time processing > messages. You can address this either by increasing max.poll.interval.ms or > by reducing the maximum size of batches returned in poll() with > max.poll.records." > With the older consumer, this did not occur. > The reason is that the consumer keeps a poll timer which is used to ensure > liveness of the application thread. The poll timer automatically updates > while the `Consumer.poll(Duration)` method is blocked, while the newer > consumer only updates the poll timer when a new call to > `Consumer.poll(Duration)` is issued. This means that the > kafka-console-consumer.sh tools, which uses a very long timeout by default, > works differently with the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh
[ https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Sobeh reassigned KAFKA-16282: --- Assignee: Ahmed Sobeh > Allow to get last stable offset (LSO) in kafka-get-offsets.sh > - > > Key: KAFKA-16282 > URL: https://issues.apache.org/jira/browse/KAFKA-16282 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Ahmed Sobeh >Priority: Major > Labels: need-kip, newbie, newbie++ > > Currently, when using `kafka-get-offsets.sh` to get the offset by time, we > have these choices: > {code:java} > --time / timestamp of the offsets before > that. > -1 or latest / [Note: No offset is returned, if > the > -2 or earliest / timestamp greater than recently > -3 or max-timestamp /committed record timestamp is > -4 or earliest-local / given.] (default: latest) > -5 or latest-tiered > {code} > For the latest choice, it'll always return the "high watermark" because we > always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It > would be good if the command can support to get the last stable offset (LSO) > for transaction support. That is, sending the option with > *IsolationLevel.READ_COMMITTED* -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]
lucasbru merged PR #15372: URL: https://github.com/apache/kafka/pull/15372 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] IGNORE: flaky testing [kafka]
lucasbru closed pull request #14780: IGNORE: flaky testing URL: https://github.com/apache/kafka/pull/14780 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
cadonna commented on PR #15188: URL: https://github.com/apache/kafka/pull/15188#issuecomment-1953815656 > > > @cadonna @lianetm, since we're supporting for both subscribe method using java.util.regex.Pattern and SubscriptionPattern, I think we should throw a illegal heartbeat exeption when user try to use both method at the same time and inform the user to use once at a time, since the field SubscribedRegex is used for java.util.regex.Pattern as well as SubscriptionPattern. What do you guys think? > > > > > > IMO, we must support the deprecated pattern subscriptions with `java.util.regex.Pattern` to ensure backwards compatibility, but we do not need to support mixed usage of `java.util.regex.Pattern` and Google regex patterns. I think this is a blind spot in the KIP. I propose to throw an `IllegalStateException` if `subscribe(java.util.regex.Pattern)` is called after `subscribe(SubscriptionPattern)` (and vice versa) without calling `unsubscribe()` in between. That is similar to the restrictions between pattern, topic, and partition subscriptions @lianetm linked above. I do not think it is worth to consider the edge case of mixed usage of the two pattern types. Does this make sense to you? \cc @dajac What do you as the original author of the KIP think? Should we update the KIP to make this clear? > > @cadonna I would rather follow what we already do with `subscribe` today. The last one called takes precedence. @dajac The javadocs of `subscribe()` say: ``` * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called * previously (without a subsequent call to {@link #unsubscribe()}), or if not * configured at-least one partition assignment strategy ``` https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L727 One could argue that both subscriptions are pattern subscriptions, but they are quite different internally. I am wondering how complex it is to allow mixed usage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org