[GitHub] [kafka] guozhangwang commented on pull request #11112: MINOR: only request rejoin and log if necessary for metadata snapshot and subscription checks
guozhangwang commented on pull request #2: URL: https://github.com/apache/kafka/pull/2#issuecomment-885414788 Would not merge to 3.0 since it is not a blocker and we've past code freeze deadline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #11112: MINOR: only request rejoin and log if necessary for metadata snapshot and subscription checks
guozhangwang merged pull request #2: URL: https://github.com/apache/kafka/pull/2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11112: MINOR: only request rejoin and log if necessary for metadata snapshot and subscription checks
guozhangwang commented on pull request #2: URL: https://github.com/apache/kafka/pull/2#issuecomment-885414377 Test failures are known. Merging to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11114: KAFKA-13021: clarify KIP-633 javadocs and address remaining feedback
guozhangwang commented on a change in pull request #4: URL: https://github.com/apache/kafka/pull/4#discussion_r675318036 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ## @@ -102,49 +102,61 @@ private JoinWindows(final long beforeMs, /** * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}, - * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than - * the timestamp of the record from the primary stream. Using the method explicitly sets the grace period to - * the duration specified by {@code afterWindowEnd} which means that out of order records arriving - * after the window end will be dropped. The delay is defined as (stream_time - record_timestamp). + * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} before or after + * the timestamp of the record from the primary stream. + * + * Using this method explicitly sets the grace period to the duration specified by {@code afterWindowEnd}, which + * means that only out-of-order records arriving more than the grace period after the window end will be dropped. + * The window close, after which any incoming records are considered late and will be rejected, is defined as + * {@code windowEnd + afterWindowEnd} * * @param timeDifference join window interval * @param afterWindowEnd The grace period to admit out-of-order events to a window. - * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds} * @return A new JoinWindows object with the specified window definition and grace period + * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds} + * if the {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds} */ public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) { -return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true); +final String timeDifferenceMsgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference"); Review comment: Thanks for the catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11089: MINOR: remove unnecessary judgment in AdminUtils::assignReplicasToBrokersRackAware
guozhangwang commented on pull request #11089: URL: https://github.com/apache/kafka/pull/11089#issuecomment-885410437 Overall looks good to me. Just one qq: do we always guarantee `replicationFactor <= numBrokers`? I know we check and forbid it when creating a new topic, but after a topic is created, if we shutdown brokers and reassign replicas, would we stop with less replicas and warn users? cc @ijuma for another look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11108: KAFKA-13116: Fix message_format_change_test and compatibility_test_new_broker_test failures
ijuma commented on pull request #11108: URL: https://github.com/apache/kafka/pull/11108#issuecomment-885391600 @hachikuji I addressed your comment, this is ready for another review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations
ableegoldman commented on a change in pull request #10926: URL: https://github.com/apache/kafka/pull/10926#discussion_r675296678 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ## @@ -177,7 +204,9 @@ public long size() { * @param afterWindowEnd The grace period to admit out-of-order events to a window. * @return this updated builder * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds} + * @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} instead */ +@Deprecated public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException { Review comment: I picked up all the non-testing followup work in this PR so we could try to get it into 3.0: https://github.com/apache/kafka/pull/4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13021) Improve Javadocs for API Changes and address followup from KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-13021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17385929#comment-17385929 ] A. Sophie Blee-Goldman commented on KAFKA-13021: PR: https://github.com/apache/kafka/pull/4 > Improve Javadocs for API Changes and address followup from KIP-633 > -- > > Key: KAFKA-13021 > URL: https://issues.apache.org/jira/browse/KAFKA-13021 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Major > > There are Javadoc changes from the following PR that needs to be completed > prior to the 3.0 release. This Jira item is to track that work > [https://github.com/apache/kafka/pull/10926] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman opened a new pull request #11114: KAFKA-13021: clarify KIP-633 javadocs and address remaining feedback
ableegoldman opened a new pull request #4: URL: https://github.com/apache/kafka/pull/4 There were a few followup things to address from [#10926](https://github.com/apache/kafka/pull/10926), most importantly a number of fixes needed for the javadocs. Beyond that it's mostly just adding a few missing verification checks. Given the whole point of this KIP was to help reduce a major source of confusion, the meaning and usage of grace period within Streams, it's critical that we have clear and correct javadocs accompanying the new APIs. For that reason I think it's very important to get this into 3.0 -- it's also very low-risk, as the only non-docs changes are adding a handful of checks that already exist in the old APIs and were just missed in the new APIs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations
ableegoldman commented on a change in pull request #10926: URL: https://github.com/apache/kafka/pull/10926#discussion_r675292037 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java ## @@ -40,7 +40,11 @@ // By default grace period is 24 hours for all windows, // in other words we allow out-of-order data for up to a day -protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L; +// This behavior is now deprecated +protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 * 60 * 1000L; Review comment: Yeah it's an internal config so I hope they wouldn't assume anything from the name and extrapolate to what they can and can't use. That said, it does appear in these classes which are public themselves, so users are still going to see it. But the important thing is that it makes sense to us, the devs, who will actually be using it -- I think `DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD` is a bit more clear, just need to sneak the word "default" in there somewhere imo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #11108: KAFKA-13116: Fix message_format_change_test and compatibility_test_new_broker_test failures
ijuma commented on a change in pull request #11108: URL: https://github.com/apache/kafka/pull/11108#discussion_r675289769 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -785,7 +785,7 @@ class KafkaApis(val requestChannel: RequestChannel, Some(RecordBatch.MAGIC_VALUE_V1) else None - } + }.filter(_ => unconvertedRecords.batchIterator.hasNext) Review comment: Fixed this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13021) Improve Javadocs for API Changes and address followup from KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-13021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13021: --- Summary: Improve Javadocs for API Changes and address followup from KIP-633 (was: Improve Javadocs for API Changes from KIP-633) > Improve Javadocs for API Changes and address followup from KIP-633 > -- > > Key: KAFKA-13021 > URL: https://issues.apache.org/jira/browse/KAFKA-13021 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Major > > There are Javadoc changes from the following PR that needs to be completed > prior to the 3.0 release. This Jira item is to track that work > [https://github.com/apache/kafka/pull/10926] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations
ableegoldman commented on a change in pull request #10926: URL: https://github.com/apache/kafka/pull/10926#discussion_r675285980 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ## @@ -177,7 +204,9 @@ public long size() { * @param afterWindowEnd The grace period to admit out-of-order events to a window. * @return this updated builder * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds} + * @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} instead */ +@Deprecated public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException { Review comment: Personally I think it makes sense to just disallow calling `ofTimeDifferenceAndGrace(...).grace(...)` entirely, this seems like abusing the API -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11113: KAFKA-13128: wait for all keys to be fully processed in #shouldQueryStoresAfterAddingAndRemovingStreamThread
ableegoldman commented on pull request #3: URL: https://github.com/apache/kafka/pull/3#issuecomment-885356922 cc @PhilHardwick @cadonna @wcarlson5 @lct45 @vvcephei -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty
ableegoldman commented on a change in pull request #11057: URL: https://github.com/apache/kafka/pull/11057#discussion_r675269697 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -907,14 +944,23 @@ private void maybeSetOffsetForLeaderException(RuntimeException e) { final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size()); for (Map.Entry> entry : timestampsToSearchByNode.entrySet()) { -RequestFuture future = -sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps); +// we skip sending the list off request only if there's already one with the exact +// requested offsets for the destination node Review comment: Hm..I wonder if deduplicating like this within the Fetcher itself is too low-level, ie there may be other callers of `sendListOffsetsRequests` that actually do want to issue a new request. I think there are arguments to be made for doing this for all requests, but maybe also some arguments against it -- this is a more drastic change that means APIs like `Consumer#endOffsets` can actually return old/stale results (by up to the configured `request.timeout` at most). Since this is a last-minute blocker fix I'd prefer to keep the changes to a minimum and scoped to the specific bug, if at all possible. Can we do the deduplication in another layer, so that we only avoid re-sending the listOffsets request in the specific case of `currentLag`, where we know it's acceptable to report a slightly-out-of-date value because the alternative is to report no value at all? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13129) Fix broken system tests relate to the ConfigCommand change
[ https://issues.apache.org/jira/browse/KAFKA-13129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13129: -- Component/s: system tests > Fix broken system tests relate to the ConfigCommand change > -- > > Key: KAFKA-13129 > URL: https://issues.apache.org/jira/browse/KAFKA-13129 > Project: Kafka > Issue Type: Bug > Components: system tests >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Blocker > Fix For: 3.0.0 > > > After KAFKA-12598, the system tests failed in {{upgrade_test}}, > {{zookeeper_tls_encrypt_only_test.py,}} and {{zookeeper_tls_test.py}}. Fix > them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13129) Fix broken system tests relate to the ConfigCommand change
[ https://issues.apache.org/jira/browse/KAFKA-13129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13129: -- Priority: Blocker (was: Major) > Fix broken system tests relate to the ConfigCommand change > -- > > Key: KAFKA-13129 > URL: https://issues.apache.org/jira/browse/KAFKA-13129 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Blocker > Fix For: 3.0.0 > > > After KAFKA-12598, the system tests failed in {{upgrade_test}}, > {{zookeeper_tls_encrypt_only_test.py,}} and {{zookeeper_tls_test.py}}. Fix > them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13129) Fix broken system tests relate to the ConfigCommand change
Luke Chen created KAFKA-13129: - Summary: Fix broken system tests relate to the ConfigCommand change Key: KAFKA-13129 URL: https://issues.apache.org/jira/browse/KAFKA-13129 Project: Kafka Issue Type: Bug Reporter: Luke Chen Assignee: Luke Chen Fix For: 3.0.0 After KAFKA-12598, the system tests failed in {{upgrade_test}}, {{zookeeper_tls_encrypt_only_test.py,}} and {{zookeeper_tls_test.py}}. Fix them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #11111: KAFKA-13126: guard against overflow when computing `joinGroupTimeoutMs`
vvcephei commented on a change in pull request #1: URL: https://github.com/apache/kafka/pull/1#discussion_r675266851 ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java ## @@ -60,17 +60,16 @@ public static boolean awaitReady(KafkaClient client, Node node, Time time, long throw new IllegalArgumentException("Timeout needs to be greater than 0"); } long startTime = time.milliseconds(); -long expiryTime = startTime + timeoutMs; if (isReady(client, node, startTime) || client.ready(node, startTime)) return true; long attemptStartTime = time.milliseconds(); -while (!client.isReady(node, attemptStartTime) && attemptStartTime < expiryTime) { +while (!client.isReady(node, attemptStartTime) && attemptStartTime - startTime < timeoutMs) { if (client.connectionFailed(node)) { throw new IOException("Connection to " + node + " failed."); } -long pollTimeout = expiryTime - attemptStartTime; +long pollTimeout = (startTime - attemptStartTime) + timeoutMs; Review comment: Ah, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JoeCqupt edited a comment on pull request #11089: MINOR: remove unnecessary judgment in AdminUtils::assignReplicasToBrokersRackAware
JoeCqupt edited a comment on pull request #11089: URL: https://github.com/apache/kafka/pull/11089#issuecomment-884671155 call for review @ijuma @guozhangwang @hachikuji @mjsax @ableegoldman -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-6948) Avoid overflow in timestamp comparison
[ https://issues.apache.org/jira/browse/KAFKA-6948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-6948: - Assignee: A. Sophie Blee-Goldman > Avoid overflow in timestamp comparison > -- > > Key: KAFKA-6948 > URL: https://issues.apache.org/jira/browse/KAFKA-6948 > Project: Kafka > Issue Type: Improvement >Reporter: Giovanni Liva >Assignee: A. Sophie Blee-Goldman >Priority: Major > > Some comparisons with timestamp values are not safe. This comparisons can > trigger errors that were found in some other issues, e.g. KAFKA-4290 or > KAFKA-6608. > The following classes contains some comparison between timestamps that can > overflow. > * org.apache.kafka.clients.NetworkClientUtils > * org.apache.kafka.clients.consumer.internals.ConsumerCoordinator > * org.apache.kafka.common.security.kerberos.KerberosLogin > * org.apache.kafka.connect.runtime.WorkerSinkTask > * org.apache.kafka.connect.tools.MockSinkTask > * org.apache.kafka.connect.tools.MockSourceTask > * org.apache.kafka.streams.processor.internals.GlobalStreamThread > * org.apache.kafka.streams.processor.internals.StateDirectory > * org.apache.kafka.streams.processor.internals.StreamThread > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman opened a new pull request #11113: KAFKA-13128: wait for all keys to be fully processed in #shouldQueryStoresAfterAddingAndRemovingStreamThread
ableegoldman opened a new pull request #3: URL: https://github.com/apache/kafka/pull/3 This test is flaky due to waiting on all records to be processed for only a single key before issuing IQ lookups and asserting whether data was found. See [this comment](https://issues.apache.org/jira/browse/KAFKA-13128?focusedCommentId=17385841=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17385841) for full analysis on how this happened Should be cherrypicked to 3.0 (test fix to help stabilize the build @kkonstantine ) and to 2.8 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11107: KAFKA-13125: close KeyValueIterator instances in internals tests (part 2)
showuon commented on pull request #11107: URL: https://github.com/apache/kafka/pull/11107#issuecomment-885348053 Failed tests are unrelated: ``` Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance() Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance() Build / JDK 16 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testOneWayReplicationWithAutoOffsetSync() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #5183: KAFKA-6948 - Change comparison to avoid overflow inconsistencies
ableegoldman commented on pull request #5183: URL: https://github.com/apache/kafka/pull/5183#issuecomment-885347981 A number of these fixes are no longer relevant since switching to Timers (I guess that was #10537 ), the remaining ones I just tacked onto this PR addressing a different overflow bug. So I think we can close this in favor of [#1 ](https://github.com/apache/kafka/pull/1) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman closed pull request #5183: KAFKA-6948 - Change comparison to avoid overflow inconsistencies
ableegoldman closed pull request #5183: URL: https://github.com/apache/kafka/pull/5183 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
[ https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13128: --- Priority: Blocker (was: Major) > Flaky Test > StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread > > > Key: KAFKA-13128 > URL: https://issues.apache.org/jira/browse/KAFKA-13128 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Labels: flaky-test > Fix For: 3.0.0, 2.8.1 > > > h3. Stacktrace > java.lang.AssertionError: Expected: is not null but: was null > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455) > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
[ https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13128: --- Affects Version/s: (was: 3.1.0) > Flaky Test > StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread > > > Key: KAFKA-13128 > URL: https://issues.apache.org/jira/browse/KAFKA-13128 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > Labels: flaky-test > Fix For: 3.0.0, 2.8.1 > > > h3. Stacktrace > java.lang.AssertionError: Expected: is not null but: was null > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455) > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
[ https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13128: --- Fix Version/s: 2.8.1 3.0.0 > Flaky Test > StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread > > > Key: KAFKA-13128 > URL: https://issues.apache.org/jira/browse/KAFKA-13128 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0 >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > Labels: flaky-test > Fix For: 3.0.0, 2.8.1 > > > h3. Stacktrace > java.lang.AssertionError: Expected: is not null but: was null > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455) > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
[ https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-13128: -- Assignee: A. Sophie Blee-Goldman > Flaky Test > StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread > > > Key: KAFKA-13128 > URL: https://issues.apache.org/jira/browse/KAFKA-13128 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0 >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > Labels: flaky-test > > h3. Stacktrace > java.lang.AssertionError: Expected: is not null but: was null > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455) > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
[ https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17385841#comment-17385841 ] A. Sophie Blee-Goldman commented on KAFKA-13128: The failure is from the second line in this series of assertions {code:java} assertThat(store1.get(key), is(notNullValue())); assertThat(store1.get(key2), is(notNullValue())); assertThat(store1.get(key3), is(notNullValue())); {code} which is basically the first time we attempt IQ after starting up. The test setup includes starting Streams and waiting for it to reach RUNNING, then adding a new thread, and finally producing a set of 100 records for each of the three keys. After that it waits for all records to be processed *for _key3_* and then proceeds to the above assertions. I suspect the problem is that we only wait for all data to be processed for _key3_, but not the other two keys. In theory this should work, since the data for _key3_ is produced last and would have the largest timestamps meaning the keys should be processed more or less in order. However the input topic actually has two partitions, so it could be that _key1_ and _key3_ correspond to task 1 while _key2_ corresponds to task 2. Again, that shouldn't affect the order in which records are processed – as long as the tasks are on the same thread. But we started up a new thread in between waiting for Streams to reach RUNNING and producing data to the input topics. This new thread has to be assigned one of the tasks, but due to cooperative rebalancing it will take two full (though short) rebalances before the new thread can actually start processing any tasks. Therefore as long as the original thread continues to own the task corresponding to _key3_ after the new thread is added, it can easily get through all records for _key3_. Which would mean the test can proceed to the above assertions while the new thread is still waiting to start processing any data for _key2_ at all. There are a few ways we can address this given how many things had to happen exactly right in order to see this failure, but the simplest fix is to just wait on all three keys to be fully processed rather than just the one. This seems to align with the original intention of the test best as well > Flaky Test > StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread > > > Key: KAFKA-13128 > URL: https://issues.apache.org/jira/browse/KAFKA-13128 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0 >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: flaky-test > > h3. Stacktrace > java.lang.AssertionError: Expected: is not null but: was null > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455) > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
A. Sophie Blee-Goldman created KAFKA-13128: -- Summary: Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread Key: KAFKA-13128 URL: https://issues.apache.org/jira/browse/KAFKA-13128 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.1.0 Reporter: A. Sophie Blee-Goldman h3. Stacktrace java.lang.AssertionError: Expected: is not null but: was null at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455) https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #11112: MINOR: only request rejoin and log if necessary for metadata snapshot and subscription checks
guozhangwang commented on pull request #2: URL: https://github.com/apache/kafka/pull/2#issuecomment-885335038 @ableegoldman please review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request #11112: MINOR: only request rejoin and log if necessary for metadata snapshot and subscription checks
guozhangwang opened a new pull request #2: URL: https://github.com/apache/kafka/pull/2 Since now we call do not necessarily complete the rebalance within a poll call, we may keep checking the `rejoinNeededOrPending` which hits either of the conditions and returns true, but then returns early, resulting in flooding log entries. This PR would only log/set the flag when it was not set yet, effectively only logging for the first time. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty
guozhangwang commented on pull request #11057: URL: https://github.com/apache/kafka/pull/11057#issuecomment-885328621 I added inside the fetcher a concurrent hashmap maintaining the in-flight list-offset futures, and based on the map skip sending the same requests. LMK if it looks good, and then I will merge @vvcephei @ableegoldman -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13127) Fix stray partition lookup logic
[ https://issues.apache.org/jira/browse/KAFKA-13127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13127: Description: The result of `BrokerMetadataPublisher.findGhostReplicas` is inverted. It returns all of the non-stray replicas. This causes all of these partitions to get deleted on startup by mistake. (was: The result of `BrokerMetadataPublisher.findGhostReplicas` is inverted. It returns all of the non-stray replicas. This causes all off these partitions to get deleted on startup by mistake.) > Fix stray partition lookup logic > > > Key: KAFKA-13127 > URL: https://issues.apache.org/jira/browse/KAFKA-13127 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.0.0 > > > The result of `BrokerMetadataPublisher.findGhostReplicas` is inverted. It > returns all of the non-stray replicas. This causes all of these partitions to > get deleted on startup by mistake. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13127) Fix stray partition lookup logic
Jason Gustafson created KAFKA-13127: --- Summary: Fix stray partition lookup logic Key: KAFKA-13127 URL: https://issues.apache.org/jira/browse/KAFKA-13127 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson Fix For: 3.0.0 The result of `BrokerMetadataPublisher.findGhostReplicas` is inverted. It returns all of the non-stray replicas. This causes all off these partitions to get deleted on startup by mistake. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #11111: KAFKA-13126: guard against overflow when computing `joinGroupTimeoutMs`
ableegoldman commented on a change in pull request #1: URL: https://github.com/apache/kafka/pull/1#discussion_r675241237 ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java ## @@ -60,17 +60,16 @@ public static boolean awaitReady(KafkaClient client, Node node, Time time, long throw new IllegalArgumentException("Timeout needs to be greater than 0"); } long startTime = time.milliseconds(); -long expiryTime = startTime + timeoutMs; if (isReady(client, node, startTime) || client.ready(node, startTime)) return true; long attemptStartTime = time.milliseconds(); -while (!client.isReady(node, attemptStartTime) && attemptStartTime < expiryTime) { +while (!client.isReady(node, attemptStartTime) && attemptStartTime - startTime < timeoutMs) { if (client.connectionFailed(node)) { throw new IOException("Connection to " + node + " failed."); } -long pollTimeout = expiryTime - attemptStartTime; +long pollTimeout = (startTime - attemptStartTime) + timeoutMs; Review comment: The `startTime` is set once at the beginning of the method while the `attemptStartTime` is initialized just before the first attempt and then updated again after every iteration. So the `attemptStartTime` is always greater than the `startTime` and therefore the quantity being added to the `timeoutMs` here is actually negative. But I see how that's confusing, I'll refactor the expression to make this more clear -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-7497: - Assignee: (was: A. Sophie Blee-Goldman) > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #10811: KAFKA-12598: ConfigCommand should only support communication via ZooKeeper for a reduced set of cases
showuon commented on pull request #10811: URL: https://github.com/apache/kafka/pull/10811#issuecomment-885307107 Let me handle it ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11111: KAFKA-13126: guard against overflow when computing `joinGroupTimeoutMs`
vvcephei commented on a change in pull request #1: URL: https://github.com/apache/kafka/pull/1#discussion_r675220503 ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java ## @@ -60,17 +60,16 @@ public static boolean awaitReady(KafkaClient client, Node node, Time time, long throw new IllegalArgumentException("Timeout needs to be greater than 0"); } long startTime = time.milliseconds(); -long expiryTime = startTime + timeoutMs; if (isReady(client, node, startTime) || client.ready(node, startTime)) return true; long attemptStartTime = time.milliseconds(); -while (!client.isReady(node, attemptStartTime) && attemptStartTime < expiryTime) { +while (!client.isReady(node, attemptStartTime) && attemptStartTime - startTime < timeoutMs) { if (client.connectionFailed(node)) { throw new IOException("Connection to " + node + " failed."); } -long pollTimeout = expiryTime - attemptStartTime; +long pollTimeout = (startTime - attemptStartTime) + timeoutMs; Review comment: This would still overflow if `timeoutMs` is `MAX_VALUE`, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13126) Overflow in joinGroupTimeoutMs when max.poll.interval.ms is MAX_VALUE leads to missing rebalances
[ https://issues.apache.org/jira/browse/KAFKA-13126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13126: --- Description: In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this override, users of both the plain consumer client and kafka streams still set the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an overflow when computing the {{joinGroupTimeoutMs}} and results in it being set to the {{request.timeout.ms}} instead, which is much lower. This can easily make consumers drop out of the group, since they must rejoin now within 30s (by default) but have no obligation to almost ever call poll() given the high {{max.poll.interval.ms}} – basically they will only do so after processing the last record from the previously polled batch. So in heavy processing cases, where each record takes a long time to process, or when using a very large {{max.poll.records}}, it can be difficult to make any progress at all before dropping out and needing to rejoin. And of course, the rebalance that is kicked off upon this member rejoining can result in many of the other members in the group dropping out as well, leading to an endless cycle of missed rebalances. We just need to check for overflow and fix it to {{Integer.MAX_VALUE}} when it occurs. The workaround until then is of course to just set the {{max.poll.interval.ms}} to MAX_VALUE - 5000 (5s is the JOIN_GROUP_TIMEOUT_LAPSE) was: In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this override, users of both the plain consumer client and kafka streams still set the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an overflow when computing the {{joinGroupTimeoutMs}} and results in it being set to the {{request.timeout.ms}} instead, which is much lower. This can easily make consumers drop out of the group, since they must rejoin now within 30s (by default) but have no obligation to almost ever call poll() given the high {{max.poll.interval.ms}} – basically they will only do so after processing the last record from the previously polled batch. So in heavy processing cases, where each record takes a long time to process, or when using a very large {{max.poll.records}}, it can be difficult to make any progress at all before dropping out and needing to rejoin. And of course, the rebalance that is kicked off upon this member rejoining can result in many of the other members in the group dropping out as well, leading to an endless cycle of missed rebalances. We just need to check for overflow and fix it to {{Integer.MAX_VALUE}} when it occurs. > Overflow in joinGroupTimeoutMs when max.poll.interval.ms is MAX_VALUE leads > to missing rebalances > - > > Key: KAFKA-13126 > URL: https://issues.apache.org/jira/browse/KAFKA-13126 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > Fix For: 3.1.0 > > > In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was > overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this > override, users of both the plain consumer client and kafka streams still set > the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an > overflow when computing the {{joinGroupTimeoutMs}} and results in it being > set to the {{request.timeout.ms}} instead, which is much lower. > This can easily make consumers drop out of the group, since they must rejoin > now within 30s (by default) but have no obligation to almost ever call poll() > given the high {{max.poll.interval.ms}} – basically they will only do so > after processing the last record from the previously polled batch. So in > heavy processing cases, where each record takes a long time to process, or > when using a very large {{max.poll.records}}, it can be difficult to make > any progress at all before dropping out and needing to rejoin. And of course, > the rebalance that is kicked off upon this member rejoining can result in > many of the other members in the group dropping out as well, leading to an > endless cycle of missed rebalances. > We just need to check for overflow and fix it to {{Integer.MAX_VALUE}} when > it occurs. The workaround until then is of course to just set the > {{max.poll.interval.ms}} to MAX_VALUE - 5000 (5s is the > JOIN_GROUP_TIMEOUT_LAPSE) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13126) Overflow in joinGroupTimeoutMs when max.poll.interval.ms is MAX_VALUE leads to missing rebalances
[ https://issues.apache.org/jira/browse/KAFKA-13126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13126: --- Description: In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this override, users of both the plain consumer client and kafka streams still set the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an overflow when computing the {{joinGroupTimeoutMs}} and results in it being set to the {{request.timeout.ms}} instead, which is much lower. This can easily make consumers drop out of the group, since they must rejoin now within 30s (by default) but have no obligation to almost ever call poll() given the high {{max.poll.interval.ms}} – basically they will only do so after processing the last record from the previously polled batch. So in heavy processing cases, where each record takes a long time to process, or when using a very large {{max.poll.records}}, it can be difficult to make any progress at all before dropping out and needing to rejoin. And of course, the rebalance that is kicked off upon this member rejoining can result in many of the other members in the group dropping out as well, leading to an endless cycle of missed rebalances. We just need to check for overflow and fix it to {{Integer.MAX_VALUE}} when it occurs. was: In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this override, users of both the plain consumer client and kafka streams still set the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an overflow when computing the {{joinGroupTimeoutMs}} and results in it being set to the {{request.timeout.ms}} instead, which is much lower. This can easily make consumers drop out of the group, since they must rejoin now within 30s (by default) yet have no obligation to almost ever call poll() given the high {{max.poll.interval.ms}}. We just need to check for overflow and fix it to {{Integer.MAX_VALUE}} when it occurs. > Overflow in joinGroupTimeoutMs when max.poll.interval.ms is MAX_VALUE leads > to missing rebalances > - > > Key: KAFKA-13126 > URL: https://issues.apache.org/jira/browse/KAFKA-13126 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > Fix For: 3.1.0 > > > In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was > overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this > override, users of both the plain consumer client and kafka streams still set > the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an > overflow when computing the {{joinGroupTimeoutMs}} and results in it being > set to the {{request.timeout.ms}} instead, which is much lower. > This can easily make consumers drop out of the group, since they must rejoin > now within 30s (by default) but have no obligation to almost ever call poll() > given the high {{max.poll.interval.ms}} – basically they will only do so > after processing the last record from the previously polled batch. So in > heavy processing cases, where each record takes a long time to process, or > when using a very large {{max.poll.records}}, it can be difficult to make > any progress at all before dropping out and needing to rejoin. And of course, > the rebalance that is kicked off upon this member rejoining can result in > many of the other members in the group dropping out as well, leading to an > endless cycle of missed rebalances. > We just need to check for overflow and fix it to {{Integer.MAX_VALUE}} when > it occurs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13126) Overflow in joinGroupTimeoutMs when max.poll.interval.ms is MAX_VALUE leads to missing rebalances
A. Sophie Blee-Goldman created KAFKA-13126: -- Summary: Overflow in joinGroupTimeoutMs when max.poll.interval.ms is MAX_VALUE leads to missing rebalances Key: KAFKA-13126 URL: https://issues.apache.org/jira/browse/KAFKA-13126 Project: Kafka Issue Type: Bug Components: consumer Reporter: A. Sophie Blee-Goldman Assignee: A. Sophie Blee-Goldman Fix For: 3.1.0 In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this override, users of both the plain consumer client and kafka streams still set the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an overflow when computing the {{joinGroupTimeoutMs}} and results in it being set to the {{request.timeout.ms}} instead, which is much lower. This can easily make consumers drop out of the group, since they must rejoin now within 30s (by default) yet have no obligation to almost ever call poll() given the high {{max.poll.interval.ms}}. We just need to check for overflow and fix it to {{Integer.MAX_VALUE}} when it occurs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12724) Add 2.8.0 to system tests and streams upgrade tests
[ https://issues.apache.org/jira/browse/KAFKA-12724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-12724: - Priority: Blocker (was: Major) > Add 2.8.0 to system tests and streams upgrade tests > --- > > Key: KAFKA-12724 > URL: https://issues.apache.org/jira/browse/KAFKA-12724 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Blocker > > Kafka v2.8.0 is released. We should add this version to the system tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12724) Add 2.8.0 to system tests and streams upgrade tests
[ https://issues.apache.org/jira/browse/KAFKA-12724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-12724: - Fix Version/s: 3.0.0 > Add 2.8.0 to system tests and streams upgrade tests > --- > > Key: KAFKA-12724 > URL: https://issues.apache.org/jira/browse/KAFKA-12724 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Blocker > Fix For: 3.0.0 > > > Kafka v2.8.0 is released. We should add this version to the system tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #11101: MINOR: Remove redundant fields in dump log record output
hachikuji commented on pull request #11101: URL: https://github.com/apache/kafka/pull/11101#issuecomment-885262803 @ijuma I've added a test which covers most of the interesting cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #11111: HOTFIX: guard against overflow when computing `joinGroupTimeoutMs`
ableegoldman opened a new pull request #1: URL: https://github.com/apache/kafka/pull/1 In older versions of Kafka Streams, the `max.poll.interval.ms` config was overridden by default to `Integer.MAX_VALUE`. Even after we removed this override, users of both the plain consumer client and kafka streams still set the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an overflow when computing the `joinGroupTimeoutMs` and results in it being set to the `request.timeout.ms` instead, which is much lower. This can easily make consumers drop out of the group, since they must rejoin now within 30s (by default) yet have no obligation to almost ever call poll() given the high `max.poll.interval.ms`. We just need to check for overflow and fix it to `Integer.MAX_VALUE` when it occurs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty
ableegoldman commented on a change in pull request #11057: URL: https://github.com/apache/kafka/pull/11057#discussion_r675163448 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ## @@ -2237,7 +2237,21 @@ public OptionalLong currentLag(TopicPartition topicPartition) { acquireAndEnsureOpen(); try { final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel); -return lag == null ? OptionalLong.empty() : OptionalLong.of(lag); + +// if the log end offset is not known and hence cannot return lag, +// issue a list offset request for that partition so that next time +// we may get the answer; we do not need to wait for the return value +// since we would not try to poll the network client synchronously +if (lag == null) { +if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null) { +log.info("Requesting the log end offset for {} in order to compute lag", topicPartition); +fetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L)); Review comment: Ah sorry, I overlooked that we passed in a timeout of 0 (originally thought that would throw a TimeoutException but I see now it would just return -- nevermind this then) However I do think it's probably worth taking care not to fire off a million requests per second (possible slight over-exaggeration) when we're just waiting on the same partition(s). It shouldn't be too complicated to avoid sending duplicated requests so imo it's not over-optimization...thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty
guozhangwang commented on a change in pull request #11057: URL: https://github.com/apache/kafka/pull/11057#discussion_r675158219 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ## @@ -2237,7 +2237,21 @@ public OptionalLong currentLag(TopicPartition topicPartition) { acquireAndEnsureOpen(); try { final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel); -return lag == null ? OptionalLong.empty() : OptionalLong.of(lag); + +// if the log end offset is not known and hence cannot return lag, +// issue a list offset request for that partition so that next time +// we may get the answer; we do not need to wait for the return value +// since we would not try to poll the network client synchronously +if (lag == null) { +if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null) { +log.info("Requesting the log end offset for {} in order to compute lag", topicPartition); +fetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L)); Review comment: I modified the fetcher so that it would not wait for the future to complete, with timer(0) it would not be a blocking call. Or did I miss anything? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty
ableegoldman commented on a change in pull request #11057: URL: https://github.com/apache/kafka/pull/11057#discussion_r675136826 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ## @@ -2237,7 +2237,21 @@ public OptionalLong currentLag(TopicPartition topicPartition) { acquireAndEnsureOpen(); try { final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel); -return lag == null ? OptionalLong.empty() : OptionalLong.of(lag); + +// if the log end offset is not known and hence cannot return lag, +// issue a list offset request for that partition so that next time +// we may get the answer; we do not need to wait for the return value +// since we would not try to poll the network client synchronously +if (lag == null) { +if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null) { +log.info("Requesting the log end offset for {} in order to compute lag", topicPartition); +fetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L)); Review comment: Isn't this actually a blocking call? I couldn't find anything that asserted yes or no in the javadocs, but this ultimately calls down into `Fetcher#fetchOffsetsByTimes` which does seem to wait for the request future to complete (in fact, it seems to be doing a busy wait on the timer...? that doesn't seem right 樂 ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty
ableegoldman commented on a change in pull request #11057: URL: https://github.com/apache/kafka/pull/11057#discussion_r675136826 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ## @@ -2237,7 +2237,21 @@ public OptionalLong currentLag(TopicPartition topicPartition) { acquireAndEnsureOpen(); try { final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel); -return lag == null ? OptionalLong.empty() : OptionalLong.of(lag); + +// if the log end offset is not known and hence cannot return lag, +// issue a list offset request for that partition so that next time +// we may get the answer; we do not need to wait for the return value +// since we would not try to poll the network client synchronously +if (lag == null) { +if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null) { +log.info("Requesting the log end offset for {} in order to compute lag", topicPartition); +fetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L)); Review comment: Isn't this a blocking call? I couldn't find anything that asserted yes or no in the javadocs, but this ultimately calls down into `Fetcher#fetchOffsetsByTimes` which does seem to wait for the request future to complete (in fact, it seems to be doing a busy wait on the timer...? that doesn't seem right 樂 ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc edited a comment on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings
d8tltanc edited a comment on pull request #11002: URL: https://github.com/apache/kafka/pull/11002#issuecomment-885177467 kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once_beta try running them on master without the idempotent default changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc edited a comment on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings
d8tltanc edited a comment on pull request #11002: URL: https://github.com/apache/kafka/pull/11002#issuecomment-885177467 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings
d8tltanc commented on pull request #11002: URL: https://github.com/apache/kafka/pull/11002#issuecomment-885177467 kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once_beta try running them on a branch without the idempotent default changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding opened a new pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig
ccding opened a new pull request #0: URL: https://github.com/apache/kafka/pull/0 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12985) CVE-2021-28169 - Upgrade jetty to 9.4.42
[ https://issues.apache.org/jira/browse/KAFKA-12985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-12985: -- Summary: CVE-2021-28169 - Upgrade jetty to 9.4.42 (was: CVE-2021-28169 - Upgrade jetty to 9.4.41) > CVE-2021-28169 - Upgrade jetty to 9.4.42 > > > Key: KAFKA-12985 > URL: https://issues.apache.org/jira/browse/KAFKA-12985 > Project: Kafka > Issue Type: Task > Components: security >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Minor > Fix For: 3.0.0, 2.7.2, 2.8.1 > > > CVE-2021-28169 vulnerability affects Jetty versions up to 9.4.40. For more > information see https://nvd.nist.gov/vuln/detail/CVE-2021-28169 > Upgrading to Jetty version 9.4.41 should address this issue > (https://github.com/eclipse/jetty.project/security/advisories/GHSA-gwcr-j4wh-j3cq). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12985) CVE-2021-28169 - Upgrade jetty to 9.4.41
[ https://issues.apache.org/jira/browse/KAFKA-12985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-12985. --- Fix Version/s: 2.8.1 2.7.2 3.0.0 Resolution: Fixed > CVE-2021-28169 - Upgrade jetty to 9.4.41 > > > Key: KAFKA-12985 > URL: https://issues.apache.org/jira/browse/KAFKA-12985 > Project: Kafka > Issue Type: Task > Components: security >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Minor > Fix For: 3.0.0, 2.7.2, 2.8.1 > > > CVE-2021-28169 vulnerability affects Jetty versions up to 9.4.40. For more > information see https://nvd.nist.gov/vuln/detail/CVE-2021-28169 > Upgrading to Jetty version 9.4.41 should address this issue > (https://github.com/eclipse/jetty.project/security/advisories/GHSA-gwcr-j4wh-j3cq). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13070) LogManager shutdown races with periodic work scheduled by the instance
[ https://issues.apache.org/jira/browse/KAFKA-13070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17385712#comment-17385712 ] Kowshik Prakasam commented on KAFKA-13070: -- [~manasvigupta] I didn't realize you had assigned it to yourself. I was actually planning on working on this myself. Are you working on a fix for this relatively soon? If not, I will address it with a PR relatively soon. Please let me know. > LogManager shutdown races with periodic work scheduled by the instance > -- > > Key: KAFKA-13070 > URL: https://issues.apache.org/jira/browse/KAFKA-13070 > Project: Kafka > Issue Type: Bug >Reporter: Kowshik Prakasam >Assignee: Manasvi Gupta >Priority: Major > > In the LogManager shutdown sequence (in LogManager.shutdown()), we don't > cancel the periodic work scheduled by it prior to shutdown. As a result, the > periodic work could race with the shutdown sequence causing some unwanted > side effects. This is reproducible by a unit test in LogManagerTest. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #11109: KAFKA-13113: Support unregistering Raft listeners
jsancio commented on a change in pull request #11109: URL: https://github.com/apache/kafka/pull/11109#discussion_r675065535 ## File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java ## @@ -362,24 +363,27 @@ LeaderAndEpoch notifiedLeader() { } void handleCommit(MemoryBatchReader reader) { -listener.handleCommit(reader); +listener.handleCommit(this, reader); offset = reader.lastOffset().getAsLong(); } void handleSnapshot(SnapshotReader reader) { -listener.handleSnapshot(reader); +listener.handleSnapshot(this, reader); offset = reader.lastContainedLogOffset(); } void handleLeaderChange(long offset, LeaderAndEpoch leader) { -listener.handleLeaderChange(leader); +listener.handleLeaderChange(this, leader); notifiedLeader = leader; this.offset = offset; } void beginShutdown() { -listener.beginShutdown(); +listener.beginShutdown(this); } + +@Override +public void close() {} Review comment: Should fix this by appending an event to eventQueue that removes this listener. ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -2491,6 +2492,47 @@ public void testLateRegisteredListenerCatchesUp() throws Exception { assertEquals(9L, secondListener.claimedEpochStartOffset(epoch)); } +@Test +public void testReregistrationChangesListenerContext() throws Exception { +int localId = 0; +int otherNodeId = 1; +int epoch = 5; +Set voters = Utils.mkSet(localId, otherNodeId); + +List batch1 = Arrays.asList("1", "2", "3"); +List batch2 = Arrays.asList("4", "5", "6"); +List batch3 = Arrays.asList("7", "8", "9"); + +RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) +.appendToLog(1, batch1) +.appendToLog(1, batch2) +.appendToLog(2, batch3) +.withUnknownLeader(epoch - 1) +.build(); + +context.becomeLeader(); +context.client.poll(); +assertEquals(10L, context.log.endOffset().offset); + +// Let the initial listener catch up +context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 10L, epoch, 0)); Review comment: Use the helper method "advance high-watermark" in a few of these places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API
jeqo commented on a change in pull request #11099: URL: https://github.com/apache/kafka/pull/11099#discussion_r675036126 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java ## @@ -156,7 +156,7 @@ public void kTableShouldLogAndMeterOnSkippedRecords() { .filter(e -> e.getLevel().equals("WARN")) .map(Event::getMessage) .collect(Collectors.toList()), -hasItem("Skipping record due to null key. topic=[topic] partition=[0] offset=[0]") +hasItem("Skipping record due to null key. value=[value] topic=[topic] partition=[0] offset=[0]") Review comment: > On another note, I guess we could add a test for the other (new) code path when the metadata is absent. I'll leave it up to you. Happy to add more tests. What I'm wondering is how to test scenarios where no context is added 樂 ? TopologyTestDriver allows to nullify this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API
jeqo commented on a change in pull request #11099: URL: https://github.com/apache/kafka/pull/11099#discussion_r675034484 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java ## @@ -90,6 +92,13 @@ public String newStoreName(final String prefix) { "store-" ); +final ProcessorSupplier processorSupplier = () -> +new ContextualProcessor() { +@Override +public void process(final Record record) { +} Review comment: Looking at the tests, they are more related to the thread and store states more than value checking. Should we add some store updates as part of the processor for _more_ correctness? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tang7526 commented on pull request #11062: KAFKA-13094: Session windows do not consider user-specified grace when computing retention time for changelog
tang7526 commented on pull request #11062: URL: https://github.com/apache/kafka/pull/11062#issuecomment-885088407 > @tang7526 Could you rebase this PR on the latest 2.8 branch? @cadonna Done. I've already rebased this PR on the latest 2.8 branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11097: KAFKA-8529: Flakey test ConsumerBounceTest#testCloseDuringRebalance
jolshan commented on a change in pull request #11097: URL: https://github.com/apache/kafka/pull/11097#discussion_r675011109 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -144,6 +144,44 @@ class AbstractFetcherThreadTest { assertEquals(2L, replicaState.highWatermark) } + @Test + def testDelay(): Unit = { +val partition = new TopicPartition("topic", 0) + +class ErrorMockFetcherThread(fetchBackOffMs: Int) + extends MockFetcherThread(fetchBackOffMs = fetchBackOffMs) { + + override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { + throw new UnknownTopicIdException("Topic ID was unknown as expected for this test") + } +} +val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000) + +fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) +fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0))) + +val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0, + new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)) +val leaderState = MockFetcherThread.PartitionState(Seq(batch), leaderEpoch = 0, highWatermark = 2L) +fetcher.setLeaderState(partition, leaderState) + +// Do work for the first time. This should result in all partitions in error. +val timeBeforeFirst = System.currentTimeMillis() +fetcher.doWork() +val timeAfterFirst = System.currentTimeMillis() +val firstWork = timeAfterFirst - timeBeforeFirst + +// The second doWork will pause for fetchBackOffMs since all partitions will be delayed +val timeBeforeSecond = System.currentTimeMillis() +fetcher.doWork() +val timeAfterSecond = System.currentTimeMillis() Review comment: Ah hmm. It does seem to be a little flaky for the second check (fetchBackOffMs < secondWorkDuration). In a sample of 50 tests I ran with backOffMs = 500, there were 8 failures and it seems like all of them had secondWorkDuration = 500. So maybe I can just change to <= Rerunning with this setup 200 times and with fetchBackOffMs=250, I saw 0 failures. Of course, this was all locally. I'm not sure if Jenkins will behave differently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11103: HOTFIX: Set session interval back to 10s for StreamsCooperativeRebalanceUpgradeTest
cadonna commented on pull request #11103: URL: https://github.com/apache/kafka/pull/11103#issuecomment-885086432 Cherry-picked to 3.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #11103: HOTFIX: Set session interval back to 10s for StreamsCooperativeRebalanceUpgradeTest
cadonna merged pull request #11103: URL: https://github.com/apache/kafka/pull/11103 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request #11109: KAFKA-13113: Support unregistering Raft listeners
jsancio opened a new pull request #11109: URL: https://github.com/apache/kafka/pull/11109 Support unregistering by returning a ListenerContext on registration and exposing a close method on the returned ListenerContext. To allow the user to use the same Listener on different registrations the associated ListenerContext is sent through all of the methods described by the Raft Listener. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11103: HOTFIX: Set session interval back to 10s for StreamsCooperativeRebalanceUpgradeTest
cadonna commented on pull request #11103: URL: https://github.com/apache/kafka/pull/11103#issuecomment-885084926 Test failure are unrelated and known to be flaky. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11097: KAFKA-8529: Flakey test ConsumerBounceTest#testCloseDuringRebalance
hachikuji commented on a change in pull request #11097: URL: https://github.com/apache/kafka/pull/11097#discussion_r674981462 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -144,6 +144,44 @@ class AbstractFetcherThreadTest { assertEquals(2L, replicaState.highWatermark) } + @Test + def testDelay(): Unit = { +val partition = new TopicPartition("topic", 0) + +class ErrorMockFetcherThread(fetchBackOffMs: Int) + extends MockFetcherThread(fetchBackOffMs = fetchBackOffMs) { + + override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { + throw new UnknownTopicIdException("Topic ID was unknown as expected for this test") + } +} +val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000) Review comment: nit: create a `val` for `fetchBackoffMs` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size
hachikuji commented on a change in pull request #11098: URL: https://github.com/apache/kafka/pull/11098#discussion_r675005086 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int, } } - def enableTransactionalIdExpiration(): Unit = { -scheduler.schedule("transactionalId-expiration", () => { - val now = time.milliseconds() - inReadLock(stateLock) { -val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] = - transactionMetadataCache.flatMap { case (_, entry) => -entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match { - case Empty | CompleteCommit | CompleteAbort => true - case _ => false -} -}.filter { case (_, txnMetadata) => - txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs -}.map { case (transactionalId, txnMetadata) => - val txnMetadataTransition = txnMetadata.inLock { -txnMetadata.prepareDead() + private def collectExpiredTransactionalIds( +partitionId: Int, +partitionCacheEntry: TxnMetadataCacheEntry + ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = { +val currentTimeMs = time.milliseconds() + +inReadLock(stateLock) { + val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId) + replicaManager.getLogConfig(transactionPartition) match { +case Some(logConfig) => + val maxBatchSize = logConfig.maxMessageSize + val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata] + + lazy val recordsBuilder = MemoryRecords.builder( +ByteBuffer.allocate(math.min(16384, maxBatchSize)), Review comment: Note that the buffer will still grow to reach the limit of max.message.bytes. I agree, however, that one hour is a long time to wait. Let me look into triggering the next run right away. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna edited a comment on pull request #11062: KAFKA-13094: Session windows do not consider user-specified grace when computing retention time for changelog
cadonna edited a comment on pull request #11062: URL: https://github.com/apache/kafka/pull/11062#issuecomment-885080580 @tang7526 Could you rebase this PR on the latest 2.8 branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11062: KAFKA-13094: Session windows do not consider user-specified grace when computing retention time for changelog
cadonna commented on pull request #11062: URL: https://github.com/apache/kafka/pull/11062#issuecomment-885080580 @tang7526 Could you rebase this PR on the latest 2.8 branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API
jeqo commented on a change in pull request #11099: URL: https://github.com/apache/kafka/pull/11099#discussion_r675002211 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java ## @@ -92,33 +98,62 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte } @Override -public void process(final K key, final V value) { +public void process(final Record record) { // if the key is null, then ignore the record -if (key == null) { -LOG.warn( -"Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]", -context().topic(), context().partition(), context().offset() -); +if (record.key() == null) { +if (context.recordMetadata().isPresent()) { +final RecordMetadata recordMetadata = context.recordMetadata().get(); +LOG.warn( +"Skipping record due to null key. " ++ "value=[{}] topic=[{}] partition=[{}] offset=[{}]", Review comment: No worries. This is actually a good findings, as there are other 16 places where we are logging value and keys. Maybe let's create a issue to handle this in another PR? https://user-images.githubusercontent.com/6180701/126681148-2bd07547-5f56-4cc8-a9f1-88b4826b983e.png;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11097: KAFKA-8529: Flakey test ConsumerBounceTest#testCloseDuringRebalance
jolshan commented on a change in pull request #11097: URL: https://github.com/apache/kafka/pull/11097#discussion_r675000400 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -144,6 +144,44 @@ class AbstractFetcherThreadTest { assertEquals(2L, replicaState.highWatermark) } + @Test + def testDelay(): Unit = { +val partition = new TopicPartition("topic", 0) + +class ErrorMockFetcherThread(fetchBackOffMs: Int) + extends MockFetcherThread(fetchBackOffMs = fetchBackOffMs) { + + override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { + throw new UnknownTopicIdException("Topic ID was unknown as expected for this test") + } +} +val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000) + +fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) +fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0))) + +val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0, + new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)) +val leaderState = MockFetcherThread.PartitionState(Seq(batch), leaderEpoch = 0, highWatermark = 2L) +fetcher.setLeaderState(partition, leaderState) + +// Do work for the first time. This should result in all partitions in error. +val timeBeforeFirst = System.currentTimeMillis() +fetcher.doWork() +val timeAfterFirst = System.currentTimeMillis() +val firstWork = timeAfterFirst - timeBeforeFirst + +// The second doWork will pause for fetchBackOffMs since all partitions will be delayed +val timeBeforeSecond = System.currentTimeMillis() +fetcher.doWork() +val timeAfterSecond = System.currentTimeMillis() Review comment: As mentioned above, the typical time to run the first doWork was about 30ms on my machine. I can run a batch of these tests to ensure it is not flaky. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11097: KAFKA-8529: Flakey test ConsumerBounceTest#testCloseDuringRebalance
jolshan commented on a change in pull request #11097: URL: https://github.com/apache/kafka/pull/11097#discussion_r674999603 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -144,6 +144,44 @@ class AbstractFetcherThreadTest { assertEquals(2L, replicaState.highWatermark) } + @Test + def testDelay(): Unit = { +val partition = new TopicPartition("topic", 0) + +class ErrorMockFetcherThread(fetchBackOffMs: Int) + extends MockFetcherThread(fetchBackOffMs = fetchBackOffMs) { + + override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { + throw new UnknownTopicIdException("Topic ID was unknown as expected for this test") + } +} +val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000) Review comment: Sure. I just went with the default config value. Would 500 still be too high? In my testing, the non-delayed call usually took 30ms on my local machine, so we have quite a ways to go before we get close. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11097: KAFKA-8529: Flakey test ConsumerBounceTest#testCloseDuringRebalance
hachikuji commented on a change in pull request #11097: URL: https://github.com/apache/kafka/pull/11097#discussion_r674990877 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -144,6 +144,44 @@ class AbstractFetcherThreadTest { assertEquals(2L, replicaState.highWatermark) } + @Test + def testDelay(): Unit = { +val partition = new TopicPartition("topic", 0) + +class ErrorMockFetcherThread(fetchBackOffMs: Int) + extends MockFetcherThread(fetchBackOffMs = fetchBackOffMs) { + + override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { + throw new UnknownTopicIdException("Topic ID was unknown as expected for this test") + } +} +val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000) Review comment: nit: can you create a `val` for `fetchBackoffMs`? Then we can use it below as well. Also, could we use a lower value for the backoff? Would that make the test faster? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11097: KAFKA-8529: Flakey test ConsumerBounceTest#testCloseDuringRebalance
jolshan commented on a change in pull request #11097: URL: https://github.com/apache/kafka/pull/11097#discussion_r674999603 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -144,6 +144,44 @@ class AbstractFetcherThreadTest { assertEquals(2L, replicaState.highWatermark) } + @Test + def testDelay(): Unit = { +val partition = new TopicPartition("topic", 0) + +class ErrorMockFetcherThread(fetchBackOffMs: Int) + extends MockFetcherThread(fetchBackOffMs = fetchBackOffMs) { + + override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { + throw new UnknownTopicIdException("Topic ID was unknown as expected for this test") + } +} +val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000) Review comment: Sure. I just went with the default value. Would 500 still be too high? In my testing, the non-delayed call usually took 30ms on my local machine, so we have quite a ways to go before we get close. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11097: KAFKA-8529: Flakey test ConsumerBounceTest#testCloseDuringRebalance
hachikuji commented on a change in pull request #11097: URL: https://github.com/apache/kafka/pull/11097#discussion_r674994728 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -144,6 +144,44 @@ class AbstractFetcherThreadTest { assertEquals(2L, replicaState.highWatermark) } + @Test + def testDelay(): Unit = { +val partition = new TopicPartition("topic", 0) + +class ErrorMockFetcherThread(fetchBackOffMs: Int) + extends MockFetcherThread(fetchBackOffMs = fetchBackOffMs) { + + override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { + throw new UnknownTopicIdException("Topic ID was unknown as expected for this test") + } +} +val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000) + +fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) +fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0))) + +val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0, + new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)) +val leaderState = MockFetcherThread.PartitionState(Seq(batch), leaderEpoch = 0, highWatermark = 2L) +fetcher.setLeaderState(partition, leaderState) + +// Do work for the first time. This should result in all partitions in error. +val timeBeforeFirst = System.currentTimeMillis() +fetcher.doWork() +val timeAfterFirst = System.currentTimeMillis() +val firstWork = timeAfterFirst - timeBeforeFirst + +// The second doWork will pause for fetchBackOffMs since all partitions will be delayed +val timeBeforeSecond = System.currentTimeMillis() +fetcher.doWork() +val timeAfterSecond = System.currentTimeMillis() Review comment: The reliance on real time is a little annoying. Have you run this a few times to make sure it is not flaky? ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -144,6 +144,44 @@ class AbstractFetcherThreadTest { assertEquals(2L, replicaState.highWatermark) } + @Test + def testDelay(): Unit = { +val partition = new TopicPartition("topic", 0) + +class ErrorMockFetcherThread(fetchBackOffMs: Int) + extends MockFetcherThread(fetchBackOffMs = fetchBackOffMs) { + + override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { + throw new UnknownTopicIdException("Topic ID was unknown as expected for this test") + } +} +val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000) Review comment: nit: can you create a `val` for `fetchBackoffMs`? Then we can use it below as well. Also, could we use a lower value for the backoff? ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -144,6 +144,44 @@ class AbstractFetcherThreadTest { assertEquals(2L, replicaState.highWatermark) } + @Test + def testDelay(): Unit = { +val partition = new TopicPartition("topic", 0) + +class ErrorMockFetcherThread(fetchBackOffMs: Int) + extends MockFetcherThread(fetchBackOffMs = fetchBackOffMs) { + + override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { + throw new UnknownTopicIdException("Topic ID was unknown as expected for this test") + } +} +val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000) Review comment: nit: create a `val` for `fetchBackoffMs` ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -144,6 +144,44 @@ class AbstractFetcherThreadTest { assertEquals(2L, replicaState.highWatermark) } + @Test + def testDelay(): Unit = { +val partition = new TopicPartition("topic", 0) + +class ErrorMockFetcherThread(fetchBackOffMs: Int) + extends MockFetcherThread(fetchBackOffMs = fetchBackOffMs) { + + override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { + throw new UnknownTopicIdException("Topic ID was unknown as expected for this test") + } +} +val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000) + +fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) +fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0))) + +val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0, + new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)) +val leaderState = MockFetcherThread.PartitionState(Seq(batch), leaderEpoch = 0, highWatermark = 2L) +fetcher.setLeaderState(partition, leaderState) + +// Do work for the first time. This should result in all partitions in error. +val timeBeforeFirst = System.currentTimeMillis() +fetcher.doWork() +val
[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size
hachikuji commented on a change in pull request #11098: URL: https://github.com/apache/kafka/pull/11098#discussion_r674988520 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int, } } - def enableTransactionalIdExpiration(): Unit = { -scheduler.schedule("transactionalId-expiration", () => { - val now = time.milliseconds() - inReadLock(stateLock) { -val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] = - transactionMetadataCache.flatMap { case (_, entry) => -entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match { - case Empty | CompleteCommit | CompleteAbort => true - case _ => false -} -}.filter { case (_, txnMetadata) => - txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs -}.map { case (transactionalId, txnMetadata) => - val txnMetadataTransition = txnMetadata.inLock { -txnMetadata.prepareDead() + private def collectExpiredTransactionalIds( +partitionId: Int, +partitionCacheEntry: TxnMetadataCacheEntry + ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = { +val currentTimeMs = time.milliseconds() + +inReadLock(stateLock) { + val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId) + replicaManager.getLogConfig(transactionPartition) match { +case Some(logConfig) => + val maxBatchSize = logConfig.maxMessageSize + val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata] + + lazy val recordsBuilder = MemoryRecords.builder( +ByteBuffer.allocate(math.min(16384, maxBatchSize)), +TransactionLog.EnforcedCompressionType, +TimestampType.CREATE_TIME, +0L, +maxBatchSize + ) + + partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) => +txnMetadata.inLock { + if (!shouldExpire(txnMetadata, currentTimeMs)) { +true + } else if (maybeAppendExpiration(txnMetadata, recordsBuilder, currentTimeMs, maxBatchSize)) { +val transitMetadata = txnMetadata.prepareDead() +expired += TransactionalIdCoordinatorEpochAndMetadata( + transactionalId, + partitionCacheEntry.coordinatorEpoch, + transitMetadata +) +true + } else { +// If the batch is full, return false to end the search. Any remaining +// transactionalIds eligible for expiration can be picked next time. +false } - TransactionalIdCoordinatorEpochAndMetadata(transactionalId, entry.coordinatorEpoch, txnMetadataTransition) } - }.groupBy { transactionalIdCoordinatorEpochAndMetadata => - partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId) } -val recordsPerPartition = transactionalIdByPartition - .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas) => -val deletes: Array[SimpleRecord] = transactionalIdCoordinatorEpochAndMetadatas.map { entry => - new SimpleRecord(now, TransactionLog.keyToBytes(entry.transactionalId), null) -}.toArray -val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*) -val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition) -(topicPartition, records) + if (expired.isEmpty) { +(Seq.empty, MemoryRecords.EMPTY) + } else { +(expired, recordsBuilder.build()) } -def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = { - responses.forKeyValue { (topicPartition, response) => -inReadLock(stateLock) { - val toRemove = transactionalIdByPartition(topicPartition.partition) - transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry => -toRemove.foreach { idCoordinatorEpochAndMetadata => - val transactionalId = idCoordinatorEpochAndMetadata.transactionalId - val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId) - txnMetadata.inLock { -if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch - && txnMetadata.pendingState.contains(Dead) -
[GitHub] [kafka] hachikuji commented on pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size
hachikuji commented on pull request #11098: URL: https://github.com/apache/kafka/pull/11098#issuecomment-885065413 @dajac Yeah, I was thinking about that too. I'll open a jira and we can address that separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #11108: KAFKA-13116: Fix message_format_change_test and compatibility_test_new_broker_test failures
ijuma commented on a change in pull request #11108: URL: https://github.com/apache/kafka/pull/11108#discussion_r674978912 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -785,7 +785,7 @@ class KafkaApis(val requestChannel: RequestChannel, Some(RecordBatch.MAGIC_VALUE_V1) else None - } + }.filter(_ => unconvertedRecords.batchIterator.hasNext) Review comment: That code is deprecated for removal and it has been this way for years, is it worth it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10811: KAFKA-12598: ConfigCommand should only support communication via ZooKeeper for a reduced set of cases
ijuma commented on pull request #10811: URL: https://github.com/apache/kafka/pull/10811#issuecomment-885055345 @showuon can you please file a blocker Jira for 3.0? Do you have cycles to fix this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11108: KAFKA-13116: Fix message_format_change_test and compatibility_test_new_broker_test failures
hachikuji commented on a change in pull request #11108: URL: https://github.com/apache/kafka/pull/11108#discussion_r674975130 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -785,7 +785,7 @@ class KafkaApis(val requestChannel: RequestChannel, Some(RecordBatch.MAGIC_VALUE_V1) else None - } + }.filter(_ => unconvertedRecords.batchIterator.hasNext) Review comment: This is fine I guess, but it kind of feels like we should fix the down-conversion code if it is broken. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10074: KAFKA-12305: Fix Flatten SMT for array types
C0urante commented on pull request #10074: URL: https://github.com/apache/kafka/pull/10074#issuecomment-885046027 @tombentley know it's been a while and we're probably past the boat for 3.0, but wanted to check in and see if there's anything blocking this PR from being merged at the moment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino edited a comment on pull request #10811: KAFKA-12598: ConfigCommand should only support communication via ZooKeeper for a reduced set of cases
rondagostino edited a comment on pull request #10811: URL: https://github.com/apache/kafka/pull/10811#issuecomment-885022501 Yeah, it looks like we call `self.zk.describe()` in 3 system tests; the one above and also in `zookeeper_tls_encrypt_only_test.py` and `zookeeper_tls_test.py`. The latter two are simply confirming that the `--zk-tls-config-file` parameter will work with `kafka-configs.sh` (i.e. that it can talk to TLS-enabled ZooKeeper nodes), so we could easily change those to do something with SCRAM configs. The one mentioned here is simply trying to surface an error as early as possible as per the comment: ``` # Confirm we have a successful ZooKeeper upgrade by describing the topic. # Not trying to detect a problem here leads to failure in the ensuing Kafka roll, which would be a less # intuitive failure than seeing a problem here, so detect ZooKeeper upgrade problems before involving Kafka. self.zk.describe(self.topic) ``` So we could either just get rid of it or maybe list ACLs/do something with SCRAM. It also looks like `describe()` and the above 3 invocations of it are the only things we have to worry about here -- the ZK Security Migrator in `zookeeper_migration()` and listing ACLs via the `list_acls()` method are the only other things that access ZooKeeper directly, and both of these are fully supported. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10811: KAFKA-12598: ConfigCommand should only support communication via ZooKeeper for a reduced set of cases
rondagostino commented on pull request #10811: URL: https://github.com/apache/kafka/pull/10811#issuecomment-885022501 Yeah, it looks like we call `self.zk.describe()` in 3 system tests; the one above and also in `zookeeper_tls_encrypt_only_test.py` and `zookeeper_tls_test.py`. The latter two are simply confirming that the `--zk-tls-config-file` parameter will work with `kafka-configs.sh` (i.e. that it can talk to TLS-enabled ZooKeeper nodes), so we could easily change those to do something with SCRAM configs. The one mentioned here is simply trying to surface an error as early as possible as per the comment: ``` # Confirm we have a successful ZooKeeper upgrade by describing the topic. # Not trying to detect a problem here leads to failure in the ensuing Kafka roll, which would be a less # intuitive failure than seeing a problem here, so detect ZooKeeper upgrade problems before involving Kafka. self.zk.describe(self.topic) ``` So we could either just get rid of it or maybe list ACLs/do something with SCRAM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma edited a comment on pull request #10811: KAFKA-12598: ConfigCommand should only support communication via ZooKeeper for a reduced set of cases
ijuma edited a comment on pull request #10811: URL: https://github.com/apache/kafka/pull/10811#issuecomment-885004599 @showuon I'm seeing the following in the system test `upgrade_test` (from_kafka_version=0.9.0.1.to_message_format_version=0.9.0.1.compression_types=.none): > INFO - 2021-07-22 08:16:06,976 - runner_client - log - lineno:241]: RunnerClient: kafkatest.tests.core.upgrade_test.TestUpgrade.test_upgrade.from_kafka_version=0.9.0.1.to_message_format_version=0.9.0.1.compression_types=.none: FAIL: Rem > oteCommandError({'ssh_config': {'host': 'ducker11', 'hostname': 'ducker11', 'user': 'ducker', 'port': 22, 'password': '', 'identityfile': '/home/ducker/.ssh/id_rsa'}, 'hostname': 'ducker11', 'ssh_hostname': 'ducker11', 'user': 'ducker', ' > externally_routable_ip': 'ducker11', '_logger': , 'os': 'linux', '_ssh_clie > nt': , '_sftp_client': }, '/opt/kafka-dev/bin/kafka-run-class.sh kafka.admin.ConfigCommand --zookeeper ducker11:2181 --describe > --topic test_topic', 1, b'') > Traceback (most recent call last): > File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 133, in run > data = self.run_test() > File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 190, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.7/dist-packages/ducktape/mark/_mark.py", line 429, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/core/upgrade_test.py", line 199, in test_upgrade > cluster_id = self.kafka.cluster_id() > File "/opt/kafka-dev/tests/kafkatest/tests/produce_consume_validate.py", line 105, in run_produce_consume_validate > core_test_action(*args) > File "/opt/kafka-dev/tests/kafkatest/tests/core/upgrade_test.py", line 200, in > assert cluster_id is not None > File "/opt/kafka-dev/tests/kafkatest/tests/core/upgrade_test.py", line 57, in perform_upgrade > self.zk.describe(self.topic) > File "/opt/kafka-dev/tests/kafkatest/services/zookeeper.py", line 234, in describe > output = self.nodes[0].account.ssh_output(cmd) > File "/usr/local/lib/python3.7/dist-packages/ducktape/cluster/remoteaccount.py", line 370, in ssh_output > raise RemoteCommandError(self, cmd, exit_status, stderr.read()) > ducktape.cluster.remoteaccount.RemoteCommandError: ducker@ducker11: Command '/opt/kafka-dev/bin/kafka-run-class.sh kafka.admin.ConfigCommand --zookeeper ducker11:2181 --describe --topic test_topic' returned non-zero exit status 1. Is this related to this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10811: KAFKA-12598: ConfigCommand should only support communication via ZooKeeper for a reduced set of cases
ijuma commented on pull request #10811: URL: https://github.com/apache/kafka/pull/10811#issuecomment-885004599 @showuon I'm seeing the following in the system test `upgrade_test`: > INFO - 2021-07-22 08:16:06,976 - runner_client - log - lineno:241]: RunnerClient: kafkatest.tests.core.upgrade_test.TestUpgrade.test_upgrade.from_kafka_version=0.9.0.1.to_message_format_version=0.9.0.1.compression_types=.none: FAIL: Rem > oteCommandError({'ssh_config': {'host': 'ducker11', 'hostname': 'ducker11', 'user': 'ducker', 'port': 22, 'password': '', 'identityfile': '/home/ducker/.ssh/id_rsa'}, 'hostname': 'ducker11', 'ssh_hostname': 'ducker11', 'user': 'ducker', ' > externally_routable_ip': 'ducker11', '_logger': , 'os': 'linux', '_ssh_clie > nt': , '_sftp_client': }, '/opt/kafka-dev/bin/kafka-run-class.sh kafka.admin.ConfigCommand --zookeeper ducker11:2181 --describe > --topic test_topic', 1, b'') > Traceback (most recent call last): > File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 133, in run > data = self.run_test() > File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 190, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.7/dist-packages/ducktape/mark/_mark.py", line 429, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/core/upgrade_test.py", line 199, in test_upgrade > cluster_id = self.kafka.cluster_id() > File "/opt/kafka-dev/tests/kafkatest/tests/produce_consume_validate.py", line 105, in run_produce_consume_validate > core_test_action(*args) > File "/opt/kafka-dev/tests/kafkatest/tests/core/upgrade_test.py", line 200, in > assert cluster_id is not None > File "/opt/kafka-dev/tests/kafkatest/tests/core/upgrade_test.py", line 57, in perform_upgrade > self.zk.describe(self.topic) > File "/opt/kafka-dev/tests/kafkatest/services/zookeeper.py", line 234, in describe > output = self.nodes[0].account.ssh_output(cmd) > File "/usr/local/lib/python3.7/dist-packages/ducktape/cluster/remoteaccount.py", line 370, in ssh_output > raise RemoteCommandError(self, cmd, exit_status, stderr.read()) > ducktape.cluster.remoteaccount.RemoteCommandError: ducker@ducker11: Command '/opt/kafka-dev/bin/kafka-run-class.sh kafka.admin.ConfigCommand --zookeeper ducker11:2181 --describe --topic test_topic' returned non-zero exit status 1. Is this related to this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #11108: KAFKA-13116: Adjust system tests due to KAFKA-12944
ijuma opened a new pull request #11108: URL: https://github.com/apache/kafka/pull/11108 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API
vvcephei commented on a change in pull request #11099: URL: https://github.com/apache/kafka/pull/11099#discussion_r674847014 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java ## @@ -92,33 +98,62 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte } @Override -public void process(final K key, final V value) { +public void process(final Record record) { // if the key is null, then ignore the record -if (key == null) { -LOG.warn( -"Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]", -context().topic(), context().partition(), context().offset() -); +if (record.key() == null) { +if (context.recordMetadata().isPresent()) { +final RecordMetadata recordMetadata = context.recordMetadata().get(); +LOG.warn( +"Skipping record due to null key. " ++ "value=[{}] topic=[{}] partition=[{}] offset=[{}]", +record.value(), +recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() +); +} else { +LOG.warn( +"Skipping record due to null key. " ++ "value=[{}]. Topic, partition, and offset not known.", +record.value() +); +} droppedRecordsSensor.record(); return; } if (queryableName != null) { -final ValueAndTimestamp oldValueAndTimestamp = store.get(key); -final V oldValue; +final ValueAndTimestamp oldValueAndTimestamp = store.get(record.key()); +final VIn oldValue; if (oldValueAndTimestamp != null) { oldValue = oldValueAndTimestamp.value(); -if (context().timestamp() < oldValueAndTimestamp.timestamp()) { -LOG.warn("Detected out-of-order KTable update for {} at offset {}, partition {}.", -store.name(), context().offset(), context().partition()); +if (record.timestamp() < oldValueAndTimestamp.timestamp()) { +if (context.recordMetadata().isPresent()) { +final RecordMetadata recordMetadata = context.recordMetadata().get(); +LOG.warn( +"Detected out-of-order KTable update for {}, " ++ "old timestamp=[{}] new timestamp=[{}]. " ++ "value=[{}] topic=[{}] partition=[{}] offset=[{}].", Review comment: also here ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java ## @@ -92,33 +98,62 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte } @Override -public void process(final K key, final V value) { +public void process(final Record record) { // if the key is null, then ignore the record -if (key == null) { -LOG.warn( -"Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]", -context().topic(), context().partition(), context().offset() -); +if (record.key() == null) { +if (context.recordMetadata().isPresent()) { +final RecordMetadata recordMetadata = context.recordMetadata().get(); +LOG.warn( +"Skipping record due to null key. " ++ "value=[{}] topic=[{}] partition=[{}] offset=[{}]", Review comment: Oh, I'm sorry, but it looks like we need one more revision. Useful as it would be at times, we can't log any data (keys, values, or headers) because it might leak sensitive information into the logs. ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java ## @@ -156,7 +156,7 @@ public void kTableShouldLogAndMeterOnSkippedRecords() { .filter(e -> e.getLevel().equals("WARN")) .map(Event::getMessage) .collect(Collectors.toList()), -hasItem("Skipping record due to null key. topic=[topic] partition=[0] offset=[0]") +hasItem("Skipping record due to null key. value=[value] topic=[topic] partition=[0] offset=[0]") Review comment: I probably don't need to point this out, but this will have to change back when you remove the value from the production code. On
[GitHub] [kafka] showuon commented on pull request #11105: KAFKA-13123: close KeyValueIterator instances in example code and tests
showuon commented on pull request #11105: URL: https://github.com/apache/kafka/pull/11105#issuecomment-884931769 @mjsax @bbejeck , could you help review this PR (and the other 2 similar PRs: #11106, #11107). Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11105: KAFKA-13123: close KeyValueIterator instances in example code and tests
showuon commented on pull request #11105: URL: https://github.com/apache/kafka/pull/11105#issuecomment-884877170 Failed tests are unrelated. Thanks. ``` Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testSendOffsetsToTransactionTimeout() Build / JDK 11 and Scala 2.13 / kafka.log.LogCleanerParameterizedIntegrationTest.[5] codec=ZSTD Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() Build / JDK 11 and Scala 2.13 / kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment() Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance() Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance() Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11106: KAFKA-13124: close KeyValueIterator instance in internals tests (part 1)
showuon commented on pull request #11106: URL: https://github.com/apache/kafka/pull/11106#issuecomment-884876686 Failed tests are unrelated. Thanks. ``` Build / JDK 8 and Scala 2.12 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance() Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance() Build / JDK 8 and Scala 2.12 / kafka.network.SocketServerTest.testIdleConnection() Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance() Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance() Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsExpirationTest.testBumpTransactionalEpochAfterInvalidProducerIdMapping() Build / JDK 16 and Scala 2.13 / kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsToLatest() Build / JDK 16 and Scala 2.13 / kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsShiftByLowerThanEarliest() Build / JDK 16 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JoeCqupt edited a comment on pull request #11089: MINOR: remove unnecessary judgment in AdminUtils::assignReplicasToBrokersRackAware
JoeCqupt edited a comment on pull request #11089: URL: https://github.com/apache/kafka/pull/11089#issuecomment-884671155 call for review @ijuma @guozhangwang @hachikuji @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #10277: KAFKA-9914: Fix replication cycle detection
mimaison commented on pull request #10277: URL: https://github.com/apache/kafka/pull/10277#issuecomment-884830191 @tvainika It looks like a few `IdentityReplicationIntegrationTest` tests are failing. Can you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #10973: KAFKA-13033: COORDINATOR_NOT_AVAILABLE should be unmapped
mimaison commented on pull request #10973: URL: https://github.com/apache/kafka/pull/10973#issuecomment-884824815 Sorry I was away for a bit. Thanks @showuon and @dajac for following up on KIP-699! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size
dajac commented on pull request #11098: URL: https://github.com/apache/kafka/pull/11098#issuecomment-884809208 btw, I was looking at the code which expires groups and it seems that it does not consider the max batch size neither, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size
dajac commented on a change in pull request #11098: URL: https://github.com/apache/kafka/pull/11098#discussion_r674676502 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int, } } - def enableTransactionalIdExpiration(): Unit = { -scheduler.schedule("transactionalId-expiration", () => { - val now = time.milliseconds() - inReadLock(stateLock) { -val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] = - transactionMetadataCache.flatMap { case (_, entry) => -entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match { - case Empty | CompleteCommit | CompleteAbort => true - case _ => false -} -}.filter { case (_, txnMetadata) => - txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs -}.map { case (transactionalId, txnMetadata) => - val txnMetadataTransition = txnMetadata.inLock { -txnMetadata.prepareDead() + private def collectExpiredTransactionalIds( +partitionId: Int, +partitionCacheEntry: TxnMetadataCacheEntry + ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = { +val currentTimeMs = time.milliseconds() + +inReadLock(stateLock) { + val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId) + replicaManager.getLogConfig(transactionPartition) match { +case Some(logConfig) => + val maxBatchSize = logConfig.maxMessageSize + val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata] + + lazy val recordsBuilder = MemoryRecords.builder( +ByteBuffer.allocate(math.min(16384, maxBatchSize)), Review comment: I do agree that 16k seems quite reasonable for the common case. The downside is that we have to wait another hour to clean the remaining ones if they are many transactions to be expired. ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int, } } - def enableTransactionalIdExpiration(): Unit = { -scheduler.schedule("transactionalId-expiration", () => { - val now = time.milliseconds() - inReadLock(stateLock) { -val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] = - transactionMetadataCache.flatMap { case (_, entry) => -entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match { - case Empty | CompleteCommit | CompleteAbort => true - case _ => false -} -}.filter { case (_, txnMetadata) => - txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs -}.map { case (transactionalId, txnMetadata) => - val txnMetadataTransition = txnMetadata.inLock { -txnMetadata.prepareDead() + private def collectExpiredTransactionalIds( +partitionId: Int, +partitionCacheEntry: TxnMetadataCacheEntry + ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = { +val currentTimeMs = time.milliseconds() + +inReadLock(stateLock) { + val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId) + replicaManager.getLogConfig(transactionPartition) match { +case Some(logConfig) => + val maxBatchSize = logConfig.maxMessageSize + val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata] + + lazy val recordsBuilder = MemoryRecords.builder( +ByteBuffer.allocate(math.min(16384, maxBatchSize)), +TransactionLog.EnforcedCompressionType, +TimestampType.CREATE_TIME, +0L, +maxBatchSize + ) + + partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) => +txnMetadata.inLock { + if (!shouldExpire(txnMetadata, currentTimeMs)) { +true + } else if (maybeAppendExpiration(txnMetadata, recordsBuilder, currentTimeMs, maxBatchSize)) { +val transitMetadata = txnMetadata.prepareDead() +expired += TransactionalIdCoordinatorEpochAndMetadata( + transactionalId, + partitionCacheEntry.coordinatorEpoch, + transitMetadata +) +true + } else { +// If the batch is full, return false to end the search. Any remaining +// transactionalIds eligible for expiration can be
[jira] [Resolved] (KAFKA-13069) Add magic number to DefaultKafkaPrincipalBuilder.KafkaPrincipalSerde
[ https://issues.apache.org/jira/browse/KAFKA-13069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino resolved KAFKA-13069. --- Resolution: Invalid Flexible fields are sufficient as per KIP-590 VOTE email thread, so a magic number will not be needed. > Add magic number to DefaultKafkaPrincipalBuilder.KafkaPrincipalSerde > > > Key: KAFKA-13069 > URL: https://issues.apache.org/jira/browse/KAFKA-13069 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0, 2.8.0 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Critical > Fix For: 3.1.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac merged pull request #11079: MINOR: Small refactoring in admin group handlers
dajac merged pull request #11079: URL: https://github.com/apache/kafka/pull/11079 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13122) Close KeyValueIterator implemented instance to avoid resource leak
[ https://issues.apache.org/jira/browse/KAFKA-13122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13122: -- Description: Found there are "many" KeyValueIterator implemented instances don't explicitly get closed, which will cause resource leak. >From the java doc in KeyValueIterator: {color:#808080}* Users must call its {{color}{color:#808080}@code {color}{color:#808080}close} method explicitly upon completeness to release resources{color} This issue mostly happen in tests because we usually query state store to get result iterator, and then do verification, but forgot close it. This issue also *appear in the example code in our developer guide docs*. I'll use try-with-resource to fix them. To avoid huge PR created, I split this bug into 3 sub-tasks. was: Found there are "many" KeyValueIterator implemented instances don't explicitly get closed, which will cause resource leak. This issue mostly happen in tests because we usually query state store to get result iterator, and then do verification, but forgot close it. This issue also *appear in the example code in our developer guide docs*. I'll use try-with-resource to fix them. To avoid huge PR created, I split this bug into 3 sub-tasks. > Close KeyValueIterator implemented instance to avoid resource leak > -- > > Key: KAFKA-13122 > URL: https://issues.apache.org/jira/browse/KAFKA-13122 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Found there are "many" KeyValueIterator implemented instances don't > explicitly get closed, which will cause resource leak. > From the java doc in KeyValueIterator: > {color:#808080}* Users must call its {{color}{color:#808080}@code > {color}{color:#808080}close} method explicitly upon completeness to release > resources{color} > > This issue mostly happen in tests because we usually query state store to get > result iterator, and then do verification, but forgot close it. This issue > also *appear in the example code in our developer guide docs*. > > I'll use try-with-resource to fix them. To avoid huge PR created, I split > this bug into 3 sub-tasks. -- This message was sent by Atlassian Jira (v8.3.4#803005)