[GitHub] [kafka] showuon merged pull request #11880: MINOR: Fix comments in TransactionsTest
showuon merged pull request #11880: URL: https://github.com/apache/kafka/pull/11880 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #11881: MINOR: revert back to 60s session timeout for static membership roll bounce test
showuon opened a new pull request #11881: URL: https://github.com/apache/kafka/pull/11881 In this PR: https://github.com/apache/kafka/pull/11236/files#diff-bd4be654a82d362772b2010a0fa22a44916ff0da241ca7e6072741f7ef710136L689-R696 , we tried to reduce the session timeout for running tests faster. However, we accidentally overrode the session timeout in static membership tests, where we expected to set to a longer session timeout to test the static member won't trigger rebalance. Revert it back to 60 seconds. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ddrid opened a new pull request #11880: MINOR: Fix comments in TransactionsTest
ddrid opened a new pull request #11880: URL: https://github.com/apache/kafka/pull/11880 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik closed pull request #11815: MINOR: Eliminate FileRecords mutable flag
kowshik closed pull request #11815: URL: https://github.com/apache/kafka/pull/11815 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] XiaoyiPeng opened a new pull request #11879: KAFKA-13728: fix PushHttpMetricsReporter no longer pushes metrics when network failure is recovered.
XiaoyiPeng opened a new pull request #11879: URL: https://github.com/apache/kafka/pull/11879 The class `PushHttpMetricsReporter` no longer pushes metrics when network failure is recovered. I debugged the code and found the problem here : https://github.com/apache/kafka/blob/dc36dedd28ff384218b669de13993646483db966/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java#L214-L221 When we submit a task to the `ScheduledThreadPoolExecutor` that needs to be executed periodically, if the task throws an exception and is not swallowed, the task will no longer be scheduled to execute. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13728) PushHttpMetricsReporter no longer pushes metrics when network failure is recovered.
XiaoyiPeng created KAFKA-13728: -- Summary: PushHttpMetricsReporter no longer pushes metrics when network failure is recovered. Key: KAFKA-13728 URL: https://issues.apache.org/jira/browse/KAFKA-13728 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 3.1.0 Reporter: XiaoyiPeng The class *PushHttpMetricsReporter* no longer pushes metrics when network failure is recovered. I debugged the code and found the problem here : [https://github.com/apache/kafka/blob/dc36dedd28ff384218b669de13993646483db966/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java#L214-L221] When we submit a task to the *ScheduledThreadPoolExecutor* that needs to be executed periodically, if the task throws an exception and is not swallowed, the task will no longer be scheduled to execute. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13727) Edge case in cleaner can result in premature removal of ABORT marker
[ https://issues.apache.org/jira/browse/KAFKA-13727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13727: Description: The log cleaner works by first building a map of the active keys beginning from the dirty offset, and then scanning forward from the beginning of the log to decide which records should be retained based on whether they are included in the map. The map of keys has a limited size. As soon as it fills up, we stop building it. The offset corresponding to the last record that was included in the map becomes the next dirty offset. Then when we are cleaning, we stop scanning forward at the dirty offset. Or to be more precise, we continue scanning until the end of the segment which includes the dirty offset, but all records above that offset are coped as is without checking the map of active keys. Compaction is complicated by the presence of transactions. The cleaner must keep track of which transactions have data remaining so that it can tell when it is safe to remove the respective markers. It works a bit like the consumer. Before scanning a segment, the cleaner consults the aborted transaction index to figure out which transactions have been aborted. All other transactions are considered committed. The problem we have found is that the cleaner does not take into account the range of offsets between the dirty offset and the end offset of the segment containing it when querying ahead for aborted transactions. This means that when the cleaner is scanning forward from the dirty offset, it does not have the complete set of aborted transactions. The main consequence of this is that abort markers associated with transactions which start within this range of offsets become eligible for deletion even before the corresponding data has been removed from the log. Here is an example. Suppose that the log contains the following entries: offset=0, key=a offset=1, key=b offset=2, COMMIT offset=3, key=c offset=4, key=d offset=5, COMMIT offset=6, key=b offset=7, ABORT Suppose we have an offset map which can only contain 2 keys and the dirty offset starts at 0. The first time we scan forward, we will build a map with keys a and b, which will allow us to move the dirty offset up to 3. Due to the issue documented here, we will not detect the aborted transaction starting at offset 6. But it will not be eligible for deletion on this round of cleaning because it is bound by `delete.retention.ms`. Instead, our new logic will set the deletion horizon for this batch based to the current time plus the configured `delete.retention.ms`. offset=0, key=a offset=1, key=b offset=2, COMMIT offset=3, key=c offset=4, key=d offset=5, COMMIT offset=6, key=b offset=7, ABORT (deleteHorizon: N) Suppose that the time reaches N+1 before the next cleaning. We will begin from the dirty offset of 3 and collect keys c and d before stopping at offset 6. Again, we will not detect the aborted transaction beginning at offset 6 since it is out of the range. This time when we scan, the marker at offset 7 will be deleted because the transaction will be seen as empty and now the deletion horizon has passed. So we end up with this state: offset=0, key=a offset=1, key=b offset=2, COMMIT offset=3, key=c offset=4, key=d offset=5, COMMIT offset=6, key=b Effectively it becomes a hanging transaction. The interesting thing is that we might not even detect it. As far as the leader is concerned, it had already completed that transaction, so it is not expecting any additional markers. The transaction index would have been rewritten without the aborted transaction when the log was cleaned, so any consumer fetching the data would see the transaction as committed. On the other hand, if we did a reassignment to a new replica, or if we had to rebuild the full log state during recovery, then we would suddenly detect it. I am not sure how likely this scenario is in practice. I think it's fair to say it is an extremely rare case. The cleaner has to fail to clean a full segment at least two times and you still need enough time to pass for the marker's deletion horizon to be reached. Perhaps it is possible if the cardinality of keys is very high and the configured memory limit for the cleaner is low. was: The log cleaner works by first building a map of the active keys beginning from the dirty offset, and then scanning forward from the beginning of the log to decide which records should be retained based on whether they are included in the map. The map of keys has a limited size. As soon as it fills up, we stop building it. The offset corresponding to the last record that was included in the map becomes the next dirty offset. Then when we are cleaning, we stop scanning forward at the dirty offset. Or to be more precise, we continue scanning until the end of the segment which includes
[jira] [Created] (KAFKA-13727) Edge case in cleaner can result in premature removal of ABORT marker
Jason Gustafson created KAFKA-13727: --- Summary: Edge case in cleaner can result in premature removal of ABORT marker Key: KAFKA-13727 URL: https://issues.apache.org/jira/browse/KAFKA-13727 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson The log cleaner works by first building a map of the active keys beginning from the dirty offset, and then scanning forward from the beginning of the log to decide which records should be retained based on whether they are included in the map. The map of keys has a limited size. As soon as it fills up, we stop building it. The offset corresponding to the last record that was included in the map becomes the next dirty offset. Then when we are cleaning, we stop scanning forward at the dirty offset. Or to be more precise, we continue scanning until the end of the segment which includes the dirty offset, but all records above that offset are coped as is without checking the map of active keys. Compaction is complicated by the presence of transactions. The cleaner must keep track of which transactions have data remaining so that it can tell when it is safe to remove the respective markers. It works a bit like the consumer. Before scanning a segment, the cleaner consults the aborted transaction index to figure out which transactions have been aborted. All other transactions are considered committed. The problem we have found is that the cleaner does not take into account the range of offsets between the dirty offset and the end offset of the segment containing it when querying ahead for aborted transactions. This means that when the cleaner is scanning forward from the dirty offset, it does not have the complete set of aborted transactions. The main consequence of this is that abort markers associated with transactions which start within this range of offsets become eligible for deletion even before the corresponding data has been removed from the log. Here is an example. Suppose that the log contains the following entries: offset=0, key=1 offset=1, key=2 offset=2, COMMIT offset=3, key=3 offset=4, key=4 offset=5, COMMIT offset=6, key=2 offset=7, ABORT Suppose we have an offset map which can only contain 2 keys and the dirty offset starts at 0. The first time we scan forward, we will build a map with keys 1 and 2, which will allow us to move the dirty offset up to 3. Due to the issue documented here, we will not detect the aborted transaction starting at offset 6. But it will not be eligible for deletion on this round of cleaning because it is bound by `delete.retention.ms`. Instead, our new logic will set the deletion horizon for this batch based to the current time plus the configured `delete.retention.ms`. offset=0, key=a offset=1, key=b offset=2, COMMIT offset=3, key=c offset=4, key=d offset=5, COMMIT offset=6, key=b offset=7, ABORT (deleteHorizon: N) Suppose that the time reaches N+1 before the next cleaning. We will begin from the dirty offset of 3 and collect keys c and d before stopping at offset 6. Again, we will not detect the aborted transaction beginning at offset 6 since it is out of the range. This time when we scan, the marker at offset 7 will be deleted because the transaction will be seen as empty and now the deletion horizon has passed. So we end up with this state: offset=0, key=a offset=1, key=b offset=2, COMMIT offset=3, key=c offset=4, key=d offset=5, COMMIT offset=6, key=b Effectively it becomes a hanging transaction. The interesting thing is that we might not even detect it. As far as the leader is concerned, it had already completed that transaction, so it is not expecting any additional markers. The transaction index would have been rewritten without the aborted transaction when the log was cleaned, so any consumer fetching the data would see the transaction as committed. On the other hand, if we did a reassignment to a new replica, or if we had to rebuild the full log state during recovery, then we would suddenly detect it. I am not sure how likely this scenario is in practice. I think it's fair to say it is an extremely rare case. The cleaner has to fail to clean a full segment at least two times and you still need enough time to pass for the marker's deletion horizon to be reached. Perhaps it is possible if the cardinality of keys is very high and the configured memory limit for the cleaner is low. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] wcarlson5 commented on pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning
wcarlson5 commented on pull request #11868: URL: https://github.com/apache/kafka/pull/11868#issuecomment-1064636537 @guozhangwang yes that would be a concern. I think we need to document that these calls need to be made in the same order with the same topologies for each client. That is what KSQL does to make sure it works -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning
guozhangwang commented on pull request #11868: URL: https://github.com/apache/kafka/pull/11868#issuecomment-1064634922 @wcarlson5 @ableegoldman I'm wondering if we could still have live lock scenarios like this: say we have two topologies A and B, and two threads a and b each on a different KS instance. And each thread tries to remove one topology at a time, getting the future to make sure it is cleaned up, BUT they did it in a different order. 1) Thread a calls `remove(A)` and gets a futureA. 2) Thread b calls `remove(B)` and gets a futureB. 3) Thread a calls `futureA. get()` trying to delete offsets, and after that it tries to `remove(B)`, but gets blocked on topology A not removed by thread b yet. 4) Thread b calls `futureB. get()` trying to delete offsets, and after that it tries to `remove(A)`, but gets blocked on topology B not removed by thread a yet. Would that be a concern? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lihaosky opened a new pull request #11878: fix flaky EosIntegrationTest.shouldCommitCorrectOffsetIfInputTopicIsTransactional[at_least_once]
lihaosky opened a new pull request #11878: URL: https://github.com/apache/kafka/pull/11878 In this test, we started Kafka Streams app and then write to input topic in transaction. It's possible when streams commit offset, transaction hasn't finished yet. So the streams committed offset could be less than the eventual `endOffset`. This PR moves the logic of writing to input topic before starting streams app. *Summary of testing strategy (including rationale)* Run test 30 times and didn't see failure. Without this change, it always failed when running 20 times. for (( c=1; c<=30; c++ )); do ./gradlew cleanTest streams:test -PshowStandardStreams=true --tests EosIntegrationTest.shouldCommitCorrectOffsetIfInputTopicIsTransactional[at_least_once] >> test_output 2>&1; done Output: https://gist.github.com/lihaosky/ab0661a5e453d2e0970dece6a29641f2 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #11870: MINOR: jmh.sh swallows compile errors
bbejeck commented on pull request #11870: URL: https://github.com/apache/kafka/pull/11870#issuecomment-1064613347 Merged #11870 to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #11870: MINOR: jmh.sh swallows compile errors
bbejeck merged pull request #11870: URL: https://github.com/apache/kafka/pull/11870 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #11870: MINOR: jmh.sh swallows compile errors
bbejeck commented on pull request #11870: URL: https://github.com/apache/kafka/pull/11870#issuecomment-1064609464 test failures unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11870: MINOR: jmh.sh swallows compile errors
ijuma commented on pull request #11870: URL: https://github.com/apache/kafka/pull/11870#issuecomment-1064595311 Go for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #11870: MINOR: jmh.sh swallows compile errors
bbejeck commented on pull request #11870: URL: https://github.com/apache/kafka/pull/11870#issuecomment-1064549485 @ijuma I was going to merge this do you have any objections or comments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"
ableegoldman commented on pull request #11873: URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064538600 Merged to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"
ableegoldman merged pull request #11873: URL: https://github.com/apache/kafka/pull/11873 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"
ijuma commented on pull request #11873: URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064536922 Thanks for the details @wcarlson5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"
ableegoldman commented on pull request #11873: URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064522689 Test failures are unrelated. Going to merge Thanks for digging into this @wcarlson5 and @lihaosky . I hope we can figure out the cause and fix it soon as I was looking forward to having the rebalance reason be available in the JoinGroup :/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12879) Compatibility break in Admin.listOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17504584#comment-17504584 ] Randall Hauch edited comment on KAFKA-12879 at 3/10/22, 9:22 PM: - Update: I'm having trouble with backporting the Connect changes to the 2.5 branch. Because this branch is so old, I'm going to just revert the AdminClient behavior to the 2.5 branch, and _NOT_ backport the Connect retry changes. Note: the break due to KAFKA-12339 was never released in the 2.5 branch. was (Author: rhauch): Update: I'm having trouble with backporting the Connect changes to the 2.5 branch. Because this branch is so old, I'm going to just revert the AdminClient behavior to the 2.5 branch, and _NOT_ backport the Connect retry changes. > Compatibility break in Admin.listOffsets() > -- > > Key: KAFKA-12879 > URL: https://issues.apache.org/jira/browse/KAFKA-12879 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.8.0, 2.7.1, 2.6.2 >Reporter: Tom Bentley >Assignee: Philip Nee >Priority: Major > Fix For: 2.5.2, 2.8.2, 3.2.0, 3.1.1, 3.0.2, 2.7.3, 2.6.4 > > > KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). > Previously it would fail with {{UnknownTopicOrPartitionException}} when a > topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. > It seems this was more or less intentional, even though it would break code > which was expecting and handling the {{UnknownTopicOrPartitionException}}. A > workaround is to use {{retries=1}} and inspect the cause of the > {{TimeoutException}}, but this isn't really suitable for cases where the same > Admin client instance is being used for other calls where retries is > desirable. > Furthermore as well as the intended effect on {{listOffsets()}} it seems that > the change could actually affect other methods of Admin. > More generally, the Admin client API is vague about which exceptions can > propagate from which methods. This means that it's not possible to say, in > cases like this, whether the calling code _should_ have been relying on the > {{UnknownTopicOrPartitionException}} or not. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-12879) Compatibility break in Admin.listOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-12879. --- Reviewer: Randall Hauch Resolution: Fixed > Compatibility break in Admin.listOffsets() > -- > > Key: KAFKA-12879 > URL: https://issues.apache.org/jira/browse/KAFKA-12879 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.8.0, 2.7.1, 2.6.2 >Reporter: Tom Bentley >Assignee: Philip Nee >Priority: Major > Fix For: 2.5.2, 2.8.2, 3.2.0, 3.1.1, 3.0.2, 2.7.3, 2.6.4 > > > KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). > Previously it would fail with {{UnknownTopicOrPartitionException}} when a > topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. > It seems this was more or less intentional, even though it would break code > which was expecting and handling the {{UnknownTopicOrPartitionException}}. A > workaround is to use {{retries=1}} and inspect the cause of the > {{TimeoutException}}, but this isn't really suitable for cases where the same > Admin client instance is being used for other calls where retries is > desirable. > Furthermore as well as the intended effect on {{listOffsets()}} it seems that > the change could actually affect other methods of Admin. > More generally, the Admin client API is vague about which exceptions can > propagate from which methods. This means that it's not possible to say, in > cases like this, whether the calling code _should_ have been relying on the > {{UnknownTopicOrPartitionException}} or not. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-12879) Compatibility break in Admin.listOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-12879: -- Fix Version/s: 3.2.0 > Compatibility break in Admin.listOffsets() > -- > > Key: KAFKA-12879 > URL: https://issues.apache.org/jira/browse/KAFKA-12879 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.8.0, 2.7.1, 2.6.2 >Reporter: Tom Bentley >Assignee: Philip Nee >Priority: Major > Fix For: 2.5.2, 2.8.2, 3.2.0, 3.1.1, 3.0.2, 2.7.3, 2.6.4 > > > KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). > Previously it would fail with {{UnknownTopicOrPartitionException}} when a > topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. > It seems this was more or less intentional, even though it would break code > which was expecting and handling the {{UnknownTopicOrPartitionException}}. A > workaround is to use {{retries=1}} and inspect the cause of the > {{TimeoutException}}, but this isn't really suitable for cases where the same > Admin client instance is being used for other calls where retries is > desirable. > Furthermore as well as the intended effect on {{listOffsets()}} it seems that > the change could actually affect other methods of Admin. > More generally, the Admin client API is vague about which exceptions can > propagate from which methods. This means that it's not possible to say, in > cases like this, whether the calling code _should_ have been relying on the > {{UnknownTopicOrPartitionException}} or not. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-12879) Compatibility break in Admin.listOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-12879: -- Fix Version/s: 3.0.2 2.7.3 2.6.4 2.5.2 2.8.2 3.1.1 > Compatibility break in Admin.listOffsets() > -- > > Key: KAFKA-12879 > URL: https://issues.apache.org/jira/browse/KAFKA-12879 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.8.0, 2.7.1, 2.6.2 >Reporter: Tom Bentley >Assignee: Philip Nee >Priority: Major > Fix For: 2.5.2, 2.8.2, 3.1.1, 3.0.2, 2.7.3, 2.6.4 > > > KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). > Previously it would fail with {{UnknownTopicOrPartitionException}} when a > topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. > It seems this was more or less intentional, even though it would break code > which was expecting and handling the {{UnknownTopicOrPartitionException}}. A > workaround is to use {{retries=1}} and inspect the cause of the > {{TimeoutException}}, but this isn't really suitable for cases where the same > Admin client instance is being used for other calls where retries is > desirable. > Furthermore as well as the intended effect on {{listOffsets()}} it seems that > the change could actually affect other methods of Admin. > More generally, the Admin client API is vague about which exceptions can > propagate from which methods. This means that it's not possible to say, in > cases like this, whether the calling code _should_ have been relying on the > {{UnknownTopicOrPartitionException}} or not. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-12879) Compatibility break in Admin.listOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17504584#comment-17504584 ] Randall Hauch commented on KAFKA-12879: --- Update: I'm having trouble with backporting the Connect changes to the 2.5 branch. Because this branch is so old, I'm going to just revert the AdminClient behavior to the 2.5 branch, and _NOT_ backport the Connect retry changes. > Compatibility break in Admin.listOffsets() > -- > > Key: KAFKA-12879 > URL: https://issues.apache.org/jira/browse/KAFKA-12879 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.8.0, 2.7.1, 2.6.2 >Reporter: Tom Bentley >Assignee: Philip Nee >Priority: Major > > KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). > Previously it would fail with {{UnknownTopicOrPartitionException}} when a > topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. > It seems this was more or less intentional, even though it would break code > which was expecting and handling the {{UnknownTopicOrPartitionException}}. A > workaround is to use {{retries=1}} and inspect the cause of the > {{TimeoutException}}, but this isn't really suitable for cases where the same > Admin client instance is being used for other calls where retries is > desirable. > Furthermore as well as the intended effect on {{listOffsets()}} it seems that > the change could actually affect other methods of Admin. > More generally, the Admin client API is vague about which exceptions can > propagate from which methods. This means that it's not possible to say, in > cases like this, whether the calling code _should_ have been relying on the > {{UnknownTopicOrPartitionException}} or not. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-12879) Compatibility break in Admin.listOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503891#comment-17503891 ] Randall Hauch edited comment on KAFKA-12879 at 3/10/22, 9:13 PM: - The approach we decided to take was to revert the previous admin client changes from KAFKA-12339 to bring the admin client behavior back to previous expectations, and to implement retries within the KafkaBasedLog to handle cases like those identified in that issue. For example, a likely root cause of KAFKA-12339 was a Connect worker instantiates its KafkaConfigBackingStore (and other internal topic stores), which creates a KafkaBasedLog that as part of start() creates the topic if it doesn't exist and then immediately tries to read the offsets. That reading of offsets can fail if the metadata for the newly created topic hasn't been propagated to all of the brokers. We can solve this particular root cause easily by retrying the reading of offsets within the KafkaBasedLog's start() method, and since topic metadata should be propagated relatively quickly, we don't need to retry for that long – and most of the time we'd probably successfully retry within a few retries. I've just merged to trunk a PR that does this. When trying to backport this, some of the newer tests were flaky, so [~pnee] created another PR (plus another) to hopefully eliminate that flakiness, and it seemed to work. I'm in the process of backporting this all the way back to 2.6 -2.5- branch, since that's how far back the regression from KAFKA-12339 was backported. was (Author: rhauch): The approach we decided to take was to revert the previous admin client changes from KAFKA-12339 to bring the admin client behavior back to previous expectations, and to implement retries within the KafkaBasedLog to handle cases like those identified in that issue. For example, a likely root cause of KAFKA-12339 was a Connect worker instantiates its KafkaConfigBackingStore (and other internal topic stores), which creates a KafkaBasedLog that as part of start() creates the topic if it doesn't exist and then immediately tries to read the offsets. That reading of offsets can fail if the metadata for the newly created topic hasn't been propagated to all of the brokers. We can solve this particular root cause easily by retrying the reading of offsets within the KafkaBasedLog's start() method, and since topic metadata should be propagated relatively quickly, we don't need to retry for that long – and most of the time we'd probably successfully retry within a few retries. I've just merged to trunk a PR that does this. When trying to backport this, some of the newer tests were flaky, so [~pnee] created another PR (plus another) to hopefully eliminate that flakiness, and it seemed to work. I'm in the process of backporting this all the way back to 2.5 branch, since that's how far back the regression from KAFKA-12339 was backported. > Compatibility break in Admin.listOffsets() > -- > > Key: KAFKA-12879 > URL: https://issues.apache.org/jira/browse/KAFKA-12879 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.8.0, 2.7.1, 2.6.2 >Reporter: Tom Bentley >Assignee: Philip Nee >Priority: Major > > KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). > Previously it would fail with {{UnknownTopicOrPartitionException}} when a > topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. > It seems this was more or less intentional, even though it would break code > which was expecting and handling the {{UnknownTopicOrPartitionException}}. A > workaround is to use {{retries=1}} and inspect the cause of the > {{TimeoutException}}, but this isn't really suitable for cases where the same > Admin client instance is being used for other calls where retries is > desirable. > Furthermore as well as the intended effect on {{listOffsets()}} it seems that > the change could actually affect other methods of Admin. > More generally, the Admin client API is vague about which exceptions can > propagate from which methods. This means that it's not possible to say, in > cases like this, whether the calling code _should_ have been relying on the > {{UnknownTopicOrPartitionException}} or not. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)
C0urante commented on a change in pull request #11773: URL: https://github.com/apache/kafka/pull/11773#discussion_r824130593 ## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ## @@ -28,4 +30,46 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + +/** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * + * For backwards compatibility, the default implementation will return {@code null}, but connector developers are + * strongly encouraged to override this method to return a non-null value such as + * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}. + * + * Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the + * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support. Review comment: Oooh, that's an interesting idea. I think your point about the connector doing extra work is my biggest concern on that front, and it seems to be validated (heh) by how `validate` is designed (we never actually invoke `validate` on the same `Connector` instances that we invoke `start` on at the moment). It may not be very frequent, but there certainly are some connectors out there that do some heavy lifting in `start` that we wouldn't want to do every time during preflight validation. For a practical example, see MirrorMaker2: https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L120-L128 Adding a `configure` method would help with this issue by allowing us to give a long-lived config to a `Connector` that the connector can hold onto across successive invocations of `exactlyOnceSupport` and `canDefineTransactionBoundaries` (and possibly even `validate`). But, it would also complicate some of the existing preflight validation logic in the framework. Right now, we only create a single `Connector` instance per connector type per worker for preflight validation, and all invocations of `validate` and `config` are performed with that instance (see `AbstractHerder` [here](https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L431), [here](https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L664-L666), and [here](https://github.com/apache/kafka/blob/38e3787d760 fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L461-L479)). Since we use these validation-only `Connector` instances pretty loosely and without any guarantees about order of invocations for `config` and `validate`, it's unlikely that there are connectors out there that store state in either of these methods, so there could be relatively low risk for creating a new validation-only `Connector` instance every time a config has to be validated. But I have to wonder if we're overthinking things? It seems like we're trying to optimize away from methods that accept a raw config map and instead only provide that config once per round of validation. Is the objective there to create a smoother/better-documented/more-strictly-defined API? Improve resource utilization by obviating the need of connector developers to construct `AbstractConfig` subclasses (or equivalents) which parse and validate individual properties at instantiation time? As an aside--I think the language in the Javadoc may need to be revisited since, as you note, it implies that these new methods will be invoked before `start`, which is not necessarily the case (for example, in the current preflight validation PR, use of these methods is mutually exclusive with the `start` method for a given `Connector` instance). **TODO: Rework Javadoc once exact behavior is agreed upon.** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)
C0urante commented on a change in pull request #11773: URL: https://github.com/apache/kafka/pull/11773#discussion_r824130593 ## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ## @@ -28,4 +30,46 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + +/** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * + * For backwards compatibility, the default implementation will return {@code null}, but connector developers are + * strongly encouraged to override this method to return a non-null value such as + * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}. + * + * Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the + * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support. Review comment: Oooh, that's an interesting idea. I think your point about the connector doing extra work is my biggest concern on that front, and it seems to be validated (heh) by how `validate` is designed (we never actually invoke `validate` on the same `Connector` instances that we invoke `start` on at the moment). It may not be very frequent, but there certainly are some connectors out there that do some heavy lifting in `start` that we wouldn't want to do every time during preflight validation. For a practical example, see MirrorMaker2: https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L120-L128 Adding a `configure` method would help with this issue by allowing us to give a long-lived config to a `Connector` that the connector can hold onto across successive invocations of `exactlyOnceSupport` and `canDefineTransactionBoundaries` (and possibly even `validate`). But, it would also complicate some of the existing preflight validation logic in the framework. Right now, we only create a single `Connector` instance per connector type per worker for preflight validation, and all invocations of `validate` and `config` are performed with that instance (see `AbstractHerder` [here](https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L431), [here](https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L664-L666), and [here](https://github.com/apache/kafka/blob/38e3787d760 fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L461-L479)). Since we use these validation-only `Connector` instances pretty loosely and without any guarantees about order of invocations for `config` and `validate`, it's unlikely that there are connectors out there that store state in either of these methods, so there could be relatively low risk for creating a new validation-only `Connector` instance every time a config has to be validated. But I have to wonder if we're overthinking things? It seems like we're trying to optimize away from methods that accept a raw config map and instead only provide that config once per round of validation. Is the objective there to create a smoother/better-documented/more-strictly-defined API? Improve resource utilization by obviating the need of connector developers to construct `AbstractConfig` subclasses (or equivalents) which parse and validate individual properties at instantiation time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #11815: MINOR: Eliminate FileRecords mutable flag
junrao commented on pull request #11815: URL: https://github.com/apache/kafka/pull/11815#issuecomment-1064456133 @kowshik : Thanks for the investigation. I agree that this does seem to justify keeping the mutable flag. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13699) ProcessorContext does not expose Stream Time
[ https://issues.apache.org/jira/browse/KAFKA-13699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-13699: --- Assignee: Matthias J. Sax > ProcessorContext does not expose Stream Time > > > Key: KAFKA-13699 > URL: https://issues.apache.org/jira/browse/KAFKA-13699 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: Shay Lin >Assignee: Matthias J. Sax >Priority: Major > Labels: newbie > > As a KS developer, I would like to leverage > [KIP-622|https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext] > and access stream time in Processor Context. > _(Updated)_ > However, the methods currentStreamTimeMs or currentSystemTimeMs is missing > from for KStreams 3.0+. > Checked with [~mjsax] , the methods are absent from the Processor API , i.e. > * org.apache.kafka.streams.processor.api.ProcessorContext -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] mjsax opened a new pull request #11877: KAKFA-13699: new ProcessorContext is missing methods
mjsax opened a new pull request #11877: URL: https://github.com/apache/kafka/pull/11877 In added `currentSystemTimeMs()` and `currentStreamTimeMs()` to the `ProcessorContext` via KIP-622, but forgot to add both to the new `api.ProcessorContext`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning
ableegoldman commented on pull request #11868: URL: https://github.com/apache/kafka/pull/11868#issuecomment-1064453252 Merged to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning
ableegoldman merged pull request #11868: URL: https://github.com/apache/kafka/pull/11868 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning
ableegoldman commented on pull request #11868: URL: https://github.com/apache/kafka/pull/11868#issuecomment-1064452750 Test failures are unrelated. Merging -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #11876: Minor typo: "result is a" > "result in a"
bbejeck commented on pull request #11876: URL: https://github.com/apache/kafka/pull/11876#issuecomment-1064398452 Thanks for the contribution @aSemy! Could you file an identical PR for https://github.com/apache/kafka-site/blob/asf-site/30/streams/developer-guide/dsl-api.html and https://github.com/apache/kafka-site/blob/asf-site/31/streams/developer-guide/dsl-api.html (both in the same PR) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #11876: Minor typo: "result is a" > "result in a"
bbejeck commented on pull request #11876: URL: https://github.com/apache/kafka/pull/11876#issuecomment-1064394560 merged into trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #11876: Minor typo: "result is a" > "result in a"
bbejeck merged pull request #11876: URL: https://github.com/apache/kafka/pull/11876 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #11876: Minor typo: "result is a" > "result in a"
bbejeck commented on pull request #11876: URL: https://github.com/apache/kafka/pull/11876#issuecomment-1064390638 PR is html only, so merging now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aSemy opened a new pull request #11876: Minor typo: "result _is_ a" > "result _in_ a"
aSemy opened a new pull request #11876: URL: https://github.com/apache/kafka/pull/11876 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #11838: KAFKA-1372: separate 400 error from 500 error in RestClient
mimaison commented on pull request #11838: URL: https://github.com/apache/kafka/pull/11838#issuecomment-1064341677 Thanks for the PR @blcksrx ! I'll try to take a look in the next few days -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)
mimaison commented on a change in pull request #11773: URL: https://github.com/apache/kafka/pull/11773#discussion_r824001349 ## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ## @@ -28,4 +30,46 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + +/** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * + * For backwards compatibility, the default implementation will return {@code null}, but connector developers are + * strongly encouraged to override this method to return a non-null value such as + * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}. + * + * Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the + * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support. Review comment: Potentially controversial idea: Have you considered always calling `start()` first? That way the connector should have computed its configuration and should be able to easily handle `exactlyOnceSupport()` and `canDefineTransactionBoundaries()`. Obviously `start()` may cause the connector to do some work before the runtime stops it in case of an invalid configuration. But since the proposed javadoc hinted it could be called first anyway, it's not making first worst The configuration validation logic is getting more and more complex and showing the limits of the current APIs. I wonder if having a `configure(Map)` method would help us. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 closed pull request #11866: MINOR: Offset Result is separate
wcarlson5 closed pull request #11866: URL: https://github.com/apache/kafka/pull/11866 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 edited a comment on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"
wcarlson5 edited a comment on pull request #11873: URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064230020 @dajac @ijuma Hey I expanded the PR description a bit to include everything we know. I also was not sure how this causes a performance impact. It seems innocuous but i has a clear impact on our benchmarks. @lihaosky Is looking to find out more. It looks like a buffer in the network client was overflowing maybe? Commit 2ccc834faa (the original) ![image](https://user-images.githubusercontent.com/18128741/157704574-d51168cf-7cfd-4103-8e59-50609b9a4049.png) Commit 59ca166d (the revert) ![image](https://user-images.githubusercontent.com/18128741/157704295-df73de06-2e4d-4a94-bc98-26bb0b8d955f.png) Let me know if there is anything else you would like to know -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"
wcarlson5 commented on pull request #11873: URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064230020 @dajac @ijuma Hey I expanded the PR description a bit to include everything we know. I also was not sure how this causes a performance impact. It seems innocuous but i has a clear impact on our benchmarks. @lihaosky Is looking to find out more. Commit 2ccc834faa (the original) ![image](https://user-images.githubusercontent.com/18128741/157704574-d51168cf-7cfd-4103-8e59-50609b9a4049.png) Commit 59ca166d (the revert) ![image](https://user-images.githubusercontent.com/18128741/157704295-df73de06-2e4d-4a94-bc98-26bb0b8d955f.png) Let me know if there is anything else you would like to know -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"
ijuma commented on pull request #11873: URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064164621 Is this method being called in a hot loop? One difference is that there is a string concatenation if the reason is not null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"
ijuma commented on pull request #11873: URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064157672 We should not revert this without providing more detailed information, the PR doesn't include enough information. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"
ijuma commented on pull request #11873: URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064156544 Have you verified that this revert actually fixes the regression? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"
dajac commented on pull request #11873: URL: https://github.com/apache/kafka/pull/11873#issuecomment-1063998412 @wcarlson5 It would be great if we could get to the bottom on this. I don't really understand how that could impact the performances. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"
cadonna commented on pull request #11873: URL: https://github.com/apache/kafka/pull/11873#issuecomment-1063957270 Restarted builds -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik edited a comment on pull request #11815: MINOR: Eliminate FileRecords mutable flag
kowshik edited a comment on pull request #11815: URL: https://github.com/apache/kafka/pull/11815#issuecomment-1063904350 @junrao Thanks for catching. I found the test failures to be valid. Thinking about it agian, now I'm not sure if it is feasible to proceed with eliminating the `mutable` flag, because it appears to be used actively. For ex: there is code in `FileRawSnapshotWriter` which sets the file to read-only mode, ex: https://github.com/apache/kafka/blob/0f00f36/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java#L113. As a result, `FileRawSnapshotReader` can open the file only in read mode, and therefore it needs to pass `mutable=false` to `FileRecords.open()`. Most of the test failures are due to this. Please let me know if I'm missing something, or if there is an alternate approach we could take here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #11815: MINOR: Eliminate FileRecords mutable flag
kowshik commented on pull request #11815: URL: https://github.com/apache/kafka/pull/11815#issuecomment-1063904350 @junrao Thanks for catching. I found the test failures to be valid. Thinking about it agian, now I'm not sure if it is feasible to proceed with eliminating the `mutable` flag, because it appears to be used actively. For ex: there is code in `FileRawSnapshotWriter` which sets the file to read-only mode, ex: https://github.com/apache/kafka/blob/0f00f36/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java#L113. As a result, `FileRawSnapshotReader` can open the file only in read mode, and therefore it needs to pass `mutable=false` to `FileRecords.open()`. Most of the test failures are due to this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags
cadonna commented on pull request #10802: URL: https://github.com/apache/kafka/pull/10802#issuecomment-1063898824 @lkokhreidze The system tests passed: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2022-03-10--001.system-test-kafka-branch-builder--1646904256--lkokhreidze--KAFKA-6718-part1-subscription-info-changes--3e73e5b962/report.html @showuon Could you merge this PR after your review? (Of course, only if the PR looks good to you) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11875: KAFKA-13721: asymetric join-winodws should not emit spurious left/outer join results
mjsax commented on a change in pull request #11875: URL: https://github.com/apache/kafka/pull/11875#discussion_r823500646 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -211,7 +214,7 @@ private void emitNonJoinedOuterRecords(final KeyValueStore= sharedTimeTracker.streamTime - joinAfterMs - joinGraceMs) { +if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - joinSpuriousLookBackTimeMs - joinGraceMs) { Review comment: This is the actual fix. ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ## @@ -1197,7 +1194,7 @@ public void testAsymmetricWindowingAfter() { joined = stream1.join( stream2, MockValueJoiner.TOSTRING_JOINER, -JoinWindows.ofTimeDifferenceAndGrace(ofMillis(0), ofMillis(0)).after(ofMillis(100)), + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(0)).after(ofMillis(100)), Review comment: Minor side cleanup ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java ## @@ -53,9 +56,14 @@ private final String topic1 = "topic1"; private final String topic2 = "topic2"; private final Consumed consumed = Consumed.with(Serdes.Integer(), Serdes.String()); -private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private final static Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); -@SuppressWarnings("deprecation") +@BeforeClass +public static void beforeClass() { + PROPS.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L); Review comment: Side fix: we need to ensure that we never delay outputting left/outer join result due to wall-clock time not advancing using TTD ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java ## @@ -749,6 +757,70 @@ public void testWindowing() { } } +@Test +public void shouldNotEmitLeftJoinResultForAsymmetricWindow() { +final StreamsBuilder builder = new StreamsBuilder(); +final int[] expectedKeys = new int[] {0, 1, 2, 3}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).before(ZERO), +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +joined.process(supplier); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockProcessor processor = supplier.theCapturedProcessor(); +long time = 0L; + +// push two items to the primary stream; the other window is empty; this should not produce any items +// w1 = {} +// w2 = {} +// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) } +// --> w2 = {} +for (int i = 0; i < 2; i++) { +inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time + i); +} +processor.checkAndClearProcessResult(); + +// push one item to the other stream; this should produce one full-join item +// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) } +// w2 = {} +// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) } +// --> w2 = { 0:a0 (ts: 100) } +time += 100L; +inputTopic2.pipeInput(expectedKeys[0], "a" + expectedKeys[0], time); + +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(0, "A0+a0", 100L) Review comment: Without the fix, we get a spurious left-join result here. ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java ## @@ -877,7 +949,6 @@ private void testUpperWindowBound(final int[] expectedKeys, // push a dummy record to produce all left-join non-joined items time += 301L; -driver.advanceWallClockTime(Duration.ofMillis(1000L)); Review
[GitHub] [kafka] mjsax opened a new pull request #11875: KAFKA-13721: asymetric join-winodws should not emit spurious left/outer join results
mjsax opened a new pull request #11875: URL: https://github.com/apache/kafka/pull/11875 Call for review @spena @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13721) Left-join still emit spurious results in stream-stream joins in some cases
[ https://issues.apache.org/jira/browse/KAFKA-13721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-13721: --- Assignee: Matthias J. Sax > Left-join still emit spurious results in stream-stream joins in some cases > -- > > Key: KAFKA-13721 > URL: https://issues.apache.org/jira/browse/KAFKA-13721 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0 >Reporter: Nollet >Assignee: Matthias J. Sax >Priority: Major > > Stream-stream joins seems to still emit spurious results for some window > configurations. > From my tests, it happened when setting before to 0 and having a grace period > smaller than the window duration. More precisely it seems to happen when > setting before and > window duration > grace period + before > h2. how to reproduce > {code:java} > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.streams.StreamsBuilder; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.TestInputTopic; > import org.apache.kafka.streams.TestOutputTopic; > import org.apache.kafka.streams.Topology; > import org.apache.kafka.streams.TopologyTestDriver; > import org.apache.kafka.streams.kstream.JoinWindows; > import org.apache.kafka.streams.kstream.KStream; > import org.junit.After; > import org.junit.Before; > import org.junit.Test; > import java.time.Duration; > import java.time.Instant; > import java.util.Properties; > public class SpuriousLeftJoinTest { > static final Duration WINDOW_DURATION = Duration.ofMinutes(10); > static final Duration GRACE = Duration.ofMinutes(6); > static final Duration BEFORE = Duration.ZERO; > static final String LEFT_TOPIC_NAME = "LEFT_TOPIC"; > static final String RIGHT_TOPIC_NAME = "RIGHT_TOPIC"; > static final String OUTPUT_TOPIC_NAME = "OUTPUT_TOPIC"; > private static TopologyTestDriver testDriver; > private static TestInputTopic inputTopicLeft; > private static TestInputTopic inputTopicRight; > private static TestOutputTopic outputTopic; > public static Topology createTopology() { > StreamsBuilder builder = new StreamsBuilder(); > KStream leftStream = builder.stream(LEFT_TOPIC_NAME); > KStream rightStream = > builder.stream(RIGHT_TOPIC_NAME); > // return 1 if left join matched, otherwise 0 > KStream joined = leftStream.leftJoin( > rightStream, > (value1, value2) -> { > if(value2 == null){ > return 0; > } > return 1; > }, > JoinWindows.ofTimeDifferenceAndGrace(WINDOW_DURATION, GRACE) > .before(BEFORE) > ); > joined.to(OUTPUT_TOPIC_NAME); > return builder.build(); > } > @Before > public void setup() { > Topology topology = createTopology(); > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092"); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.StringSerde.class); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.IntegerSerde.class); > testDriver = new TopologyTestDriver(topology, props); > inputTopicLeft = testDriver.createInputTopic(LEFT_TOPIC_NAME, > Serdes.String().serializer(), Serdes.Integer().serializer()); > inputTopicRight = testDriver.createInputTopic(RIGHT_TOPIC_NAME, > Serdes.String().serializer(), Serdes.Integer().serializer()); > outputTopic = testDriver.createOutputTopic(OUTPUT_TOPIC_NAME, > Serdes.String().deserializer(), Serdes.Integer().deserializer()); > } > @After > public void tearDown() { > testDriver.close(); > } > @Test > public void shouldEmitOnlyOneMessageForKey1(){ > Instant now = Instant.now(); > inputTopicLeft.pipeInput("key1", 12, now); > inputTopicRight.pipeInput("key1", 13, now.plus(WINDOW_DURATION)); > // send later record to increase stream time & close the window > inputTopicLeft.pipeInput("other_key", 1212122, > now.plus(WINDOW_DURATION).plus(GRACE).plusSeconds(10)); > while (! outputTopic.isEmpty()){ > System.out.println(outputTopic.readKeyValue()); > } > } > } > {code} > Stdout of previous code is > {noformat} > KeyValue(key1, 0) > KeyValue(key1, 1) > {noformat} > However it should be > {noformat} > KeyValue(key1, 1) > {noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-10160) Kafka MM2 consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17504090#comment-17504090 ] Harsha Madduri commented on KAFKA-10160: Thanks for pointing that out, [~mimaison]. My bad, I was looking at an older commit ([https://github.com/apache/kafka/pull/8921/commits/75b1c77542fe8269b89fb4f209520bb59e70ca3e]) and expecting that to be in trunk. I just realized that the code was changed based on some comments. Thanks again! > Kafka MM2 consumer configuration > > > Key: KAFKA-10160 > URL: https://issues.apache.org/jira/browse/KAFKA-10160 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0, 2.4.1 >Reporter: Pavol Ipoth >Assignee: sats >Priority: Major > Labels: configuration, kafka, mirror-maker > Fix For: 2.4.2, 2.8.0 > > > [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,] > according this producer/consumer level properties should be configured as > e.g. somesource->sometarget.consumer.client.id, i try to set > somesource->sometarget.consumer.auto.offset.reset=latest, but without > success, consumer always tries to fetch earliest, not sure if bug or my > misconfiguration, but then at least some update to docu would be useful -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-10160) Kafka MM2 consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17504077#comment-17504077 ] Mickael Maison commented on KAFKA-10160: [~harsham] This fix is in trunk, see https://github.com/apache/kafka/commit/cf202cb6acf38c64a3e8b9e541673a12ee55eaaa > Kafka MM2 consumer configuration > > > Key: KAFKA-10160 > URL: https://issues.apache.org/jira/browse/KAFKA-10160 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0, 2.4.1 >Reporter: Pavol Ipoth >Assignee: sats >Priority: Major > Labels: configuration, kafka, mirror-maker > Fix For: 2.4.2, 2.8.0 > > > [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,] > according this producer/consumer level properties should be configured as > e.g. somesource->sometarget.consumer.client.id, i try to set > somesource->sometarget.consumer.auto.offset.reset=latest, but without > success, consumer always tries to fetch earliest, not sure if bug or my > misconfiguration, but then at least some update to docu would be useful -- This message was sent by Atlassian Jira (v8.20.1#820001)