[GitHub] [kafka] kowshik edited a comment on pull request #10280: KAFKA-12554: Refactor Log layer
kowshik edited a comment on pull request #10280: URL: https://github.com/apache/kafka/pull/10280#issuecomment-861219607 I ran the system tests in `kafkatest.tests.client.consumer_test` again: * [System test run #4564](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4564/) against `trunk/6de37e536ac76ef13530d49dc7320110332cd1ee`. * [System test run #4566](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4566/) against 008b701386ce5a4d892d6ac5b90798b981c4fba0 (the latest commit from this PR). All tests passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #10280: KAFKA-12554: Refactor Log layer
kowshik commented on pull request #10280: URL: https://github.com/apache/kafka/pull/10280#issuecomment-861219607 I ran the system tests in `kafkatest.tests.client.consumer_test` again: * [4564](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4564/) against `trunk/6de37e536ac76ef13530d49dc7320110332cd1ee`. * [4566](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4566/) against 008b701386ce5a4d892d6ac5b90798b981c4fba0 (the latest commit from this PR). All tests passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik edited a comment on pull request #10280: KAFKA-12554: Refactor Log layer
kowshik edited a comment on pull request #10280: URL: https://github.com/apache/kafka/pull/10280#issuecomment-860537313 @junrao Thanks for the review! I ran the system tests. 1. [System test run #4560](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4560/) on top of the latest commit 008b701386ce5a4d892d6ac5b90798b981c4fba0 from this PR. The run finished with 12 test failures. 2. [System test run #4561](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4561/) against AK trunk on top of commit 6de37e536ac76ef13530d49dc7320110332cd1ee which does not contain changes from this PR. The run finished with 13 test failures. There were 11 overlapping failures in both (1) and (2). For these, I didn't find anything abnormal in the logs so far, the failure reason seems similar in both. The only new failure in (1) that's not present in (2) was: ``` Module: kafkatest.tests.client.consumer_test Class: OffsetValidationTest Method: test_broker_failure Arguments: { "clean_shutdown": true, "enable_autocommit": false, "metadata_quorum": "REMOTE_KRAFT" } ``` Logs indicate that the test failed [at this line](https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/tests/kafkatest/tests/client/consumer_test.py#L388) because one of the worker nodes running the consumer didn't complete within the timeout of 30s. This doesn't seem indicative of a real failure (yet), so I'm rerunning the system tests again to check if the failure is consistent. I'll keep you posted on the outcome of this second run. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #10843: MINOR: Log formatting for exceptions during configuration related operations
dajac merged pull request #10843: URL: https://github.com/apache/kafka/pull/10843 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #10843: MINOR: Log formatting for exceptions during configuration related operations
dajac commented on pull request #10843: URL: https://github.com/apache/kafka/pull/10843#issuecomment-861202533 @YiDing-Duke Thanks for the patch! Merging to trunk and 2.8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10871: KAFKA-8940: decrease session timeout to make test faster and reliable
showuon commented on pull request #10871: URL: https://github.com/apache/kafka/pull/10871#issuecomment-861147367 @ableegoldman , thanks for good reminder. I totally agree with you. I've updated the PR description and in the JIRA ticket comment. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8295) Optimize count() using RocksDB merge operator
[ https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363356#comment-17363356 ] A. Sophie Blee-Goldman commented on KAFKA-8295: --- Yes, any of the count DSL operators. It may be a bit more tricky than it appears on the surface because count is actually converted into a generic aggregation under the covers, so you'd have to tease it out into its own independent optimized implementation. To be honest, I don't have a good sense of whether it's even worth the additional code complexity, because I don't know how much additional code and/or code paths this will introduce :) I recommend looking into that before jumping straight in. Of course, we could consider introducing some kind of top-level merge-based operator to the DSL as a feature in its own right. Then count could just be converted to use that instead of the aggregation implementation. Not sure what that would look like, or if it would even be useful at all – just throwing out thoughts here. Anyways I just thought it would be interesting to explore what we might be able to do with this merge operator in Kafka Streams, whether that's an optimization of existing operators or some kind of first class operator of its own. That's really the point of this ticket: to explore the merge operator. > Optimize count() using RocksDB merge operator > - > > Key: KAFKA-8295 > URL: https://issues.apache.org/jira/browse/KAFKA-8295 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Major > > In addition to regular put/get/delete RocksDB provides a fourth operation, > merge. This essentially provides an optimized read/update/write path in a > single operation. One of the built-in (C++) merge operators exposed over the > Java API is a counter. We should be able to leverage this for a more > efficient implementation of count() > > (Note: Unfortunately it seems unlikely we can use this to optimize general > aggregations, even if RocksJava allowed for a custom merge operator, unless > we provide a way for the user to specify and connect a C++ implemented > aggregator – otherwise we incur too much cost crossing the jni for a net > performance benefit) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12844) KIP-740 follow up: clean up TaskId
[ https://issues.apache.org/jira/browse/KAFKA-12844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363355#comment-17363355 ] A. Sophie Blee-Goldman commented on KAFKA-12844: As mentioned elsewhere, this ticket is marked for 4.0 and as such, cannot be worked on yet. When 4.0 is announced you are free to pick this up and work on it again, but as of this point we don't yet know when version 4.0 will be released. Most likely we will have 3.1 after the in-progress 3.0, though I suppose it depends on the Zookeeper removal work. > KIP-740 follow up: clean up TaskId > -- > > Key: KAFKA-12844 > URL: https://issues.apache.org/jira/browse/KAFKA-12844 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: loboxu >Priority: Blocker > Fix For: 4.0.0 > > > See > [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557] > – for the TaskId class, we need to remove the following deprecated APIs: > # The public partition and topicGroupId fields should be "removed", ie made > private (can also now rename topicGroupId to subtopology to match the getter) > # The two #readFrom and two #writeTo methods can be removed (they have > already been converted to internal utility methods we now use instead, so > just remove them) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata
[ https://issues.apache.org/jira/browse/KAFKA-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363354#comment-17363354 ] A. Sophie Blee-Goldman commented on KAFKA-12843: This was pointed out on the PR, but just leaving a note on the ticket for visibility/, plus to clarify for anyone else who comes across this: This ticket is marked for fix in version 4.0, which means we can't work on it yet. We are only in the process of releasing 3.0 at the moment, and it's likely that 3.1 will come after that. Once the bump to 4.0 has been decided, you can pick this up again and actually work on it. > KIP-740 follow up: clean up TaskMetadata > > > Key: KAFKA-12843 > URL: https://issues.apache.org/jira/browse/KAFKA-12843 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: loboxu >Priority: Blocker > Fix For: 4.0.0 > > > See > [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557] > – for the TaskMetadata class, we need to: > # Deprecate the TaskMetadata#getTaskId method > # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() > API that returns a TaskId instead of a String > # Remove the deprecated constructor -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10871: KAFKA-8940: decrease session timeout to make test faster and reliable
ableegoldman commented on pull request #10871: URL: https://github.com/apache/kafka/pull/10871#issuecomment-861138874 Thanks @showuon. Can you add a comment or update the description with the specific error message in the failure mode that this fix is intended to address? As you point out, my analysis of the test from a while back shows that we need to shore up either the input data production or the output verification itself to get this totally correct. You can detect when the failure is due to that bug in the test assumptions because the associated error is the `java.lang.AssertionError: verifying tagg` exception message. It would be good to explicitly point out what kind of failure (ie the error message/exception/stacktrace) this fix was directed at, so we can keep an eye out for it and adjust the session timeout further if necessary. (I don't really expect it will, but you know how it is 🙂 ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12690) Remove deprecated Producer#sendOffsetsToTransaction
[ https://issues.apache.org/jira/browse/KAFKA-12690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363343#comment-17363343 ] A. Sophie Blee-Goldman commented on KAFKA-12690: Hey [~loboxu], as pointed out elsewhere this ticket is marked for 4.0, and is not yet ready to be worked on. You're absolutely free to pick this up when the time comes around, so you can leave yourself assigned if you'd like, but it may be a while before 4.0 comes around. Just a heads up :) > Remove deprecated Producer#sendOffsetsToTransaction > --- > > Key: KAFKA-12690 > URL: https://issues.apache.org/jira/browse/KAFKA-12690 > Project: Kafka > Issue Type: Task > Components: producer >Reporter: A. Sophie Blee-Goldman >Assignee: loboxu >Priority: Blocker > Fix For: 4.0.0 > > > In > [KIP-732|https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2] > we deprecated the EXACTLY_ONCE and EXACTLY_ONCE_BETA configs in > StreamsConfig, to be removed in 4.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12689) Remove deprecated EOS configs
[ https://issues.apache.org/jira/browse/KAFKA-12689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363342#comment-17363342 ] A. Sophie Blee-Goldman commented on KAFKA-12689: Hey [~loboxu], this ticket is marked with 4.0 as the Fix Version, which means it can't be worked on until the version 4.0. This version has yet to be announced, and since 3.0 is only just about to be released I would not assume that 4.0 is definitely right around the corner. Most likely 4.0 will be soon-ish, but I would look for other tickets to work on for now. > Remove deprecated EOS configs > - > > Key: KAFKA-12689 > URL: https://issues.apache.org/jira/browse/KAFKA-12689 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: loboxu >Priority: Blocker > Fix For: 4.0.0 > > > In > [KIP-732|https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2] > we deprecated the EXACTLY_ONCE and EXACTLY_ONCE_BETA configs in > StreamsConfig, to be removed in 4.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #10878: KAFKA-12898; Owned partitions in the subscription must be sorted
ableegoldman commented on a change in pull request #10878: URL: https://github.com/apache/kafka/pull/10878#discussion_r651411365 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java ## @@ -70,16 +71,24 @@ public static ByteBuffer serializeSubscription(final Subscription subscription, version = checkSubscriptionVersion(version); ConsumerProtocolSubscription data = new ConsumerProtocolSubscription(); -data.setTopics(subscription.topics()); + +List topics = new ArrayList<>(subscription.topics()); +topics.sort(null); Review comment: ```suggestion Collections.sort(topics); ``` ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java ## @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals; import java.nio.BufferUnderflowException; +import java.util.Comparator; Review comment: nit: move these below the `o.a.k` imports -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10883: KAFKA-12949: TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh
ijuma commented on pull request #10883: URL: https://github.com/apache/kafka/pull/10883#issuecomment-861129160 @hachikuji @jsancio do we want to keep this class? It's the second time it breaks so we either need to improve test coverage or we should remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10876: KAFKA-12843: KIP-740 follow up: clean up TaskMetadata
ableegoldman commented on pull request #10876: URL: https://github.com/apache/kafka/pull/10876#issuecomment-861126045 Thanks for the PR, but @jlprat is correct. Unfortunately this ticket will need to wait for the 4.0 release, which as of this time has not yet even been announced. You can work on this when that time does come around, but for now I recommend looking for another ticket that can actually be worked on right away. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #10367: KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector
showuon edited a comment on pull request #10367: URL: https://github.com/apache/kafka/pull/10367#issuecomment-861125710 @kkonstantine , could you check this PR? Or I should find another guy to review this PR since it's been 3 months? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10367: KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector
showuon commented on pull request #10367: URL: https://github.com/apache/kafka/pull/10367#issuecomment-861125710 @kkonstantine , could you check this PR? Or I should find another guy to review this PR? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-7360) Code example in "Accessing Processor Context" misses a closing parenthesis
[ https://issues.apache.org/jira/browse/KAFKA-7360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17362470#comment-17362470 ] Vijay edited comment on KAFKA-7360 at 6/15/21, 2:39 AM: Can this be assigned to me ? I'd like to work on this. was (Author: vijaykriishna): Can you please assign it to me ? > Code example in "Accessing Processor Context" misses a closing parenthesis > -- > > Key: KAFKA-7360 > URL: https://issues.apache.org/jira/browse/KAFKA-7360 > Project: Kafka > Issue Type: Bug > Components: documentation >Affects Versions: 2.0.0 >Reporter: Sven Erik Knop >Priority: Minor > > https://kafka.apache.org/20/documentation/streams/developer-guide/processor-api.html#accessing-processor-context > Code example has some issues: > public void process(String key, String value) { > > // add a header to the elements > context().headers().add.("key", "key" > } > Should be > public void process(String key, String value) { > > // add a header to the elements > context().headers().add("key", "value") > } -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-7302) Remove Java7 examples from Streams Docs
[ https://issues.apache.org/jira/browse/KAFKA-7302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17362471#comment-17362471 ] Vijay edited comment on KAFKA-7302 at 6/15/21, 2:38 AM: Can this be assigned to me ? I'd like to work on this. was (Author: vijaykriishna): Can you please assign it to me ? > Remove Java7 examples from Streams Docs > --- > > Key: KAFKA-7302 > URL: https://issues.apache.org/jira/browse/KAFKA-7302 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Priority: Minor > > In 2.0 release, Java7 support was dropped. We might consider removing Java7 > examples from the Streams docs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] iamgd67 commented on pull request #10818: KAFKA-12889: log clean relative index range check of group consider empty log segment to avoid too many empty log segment left
iamgd67 commented on pull request #10818: URL: https://github.com/apache/kafka/pull/10818#issuecomment-861117484 @guozhangwang could you please review this, thanks in advance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client
jsancio commented on a change in pull request #10786: URL: https://github.com/apache/kafka/pull/10786#discussion_r651390233 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2285,7 +2287,11 @@ QuorumState quorum() { } public OptionalLong highWatermark() { -return quorum.highWatermark().isPresent() ? OptionalLong.of(quorum.highWatermark().get().offset) : OptionalLong.empty(); +if (quorum.highWatermark().isPresent()) { +return OptionalLong.of(quorum.highWatermark().get().offset); +} else { +return OptionalLong.empty(); +} Review comment: No functional change here. Just a formatting change. Always found this line hard to read and I had to fix it :smile: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client
jsancio commented on a change in pull request #10786: URL: https://github.com/apache/kafka/pull/10786#discussion_r651387879 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java ## @@ -230,12 +230,18 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) { * Create a writable snapshot for the given snapshot id. * * See {@link RawSnapshotWriter} for details on how to use this object. The caller of - * this method is responsible for invoking {@link RawSnapshotWriter#close()}. + * this method is responsible for invoking {@link RawSnapshotWriter#close()}. If a + * snapshot already exists then return an {@link Optional#empty()}. * * @param snapshotId the end offset and epoch that identifies the snapshot - * @return a writable snapshot + * @param validate validate the snapshot id against the log + * @return a writable snapshot if it doesn't already exists + * @throws IllegalArgumentException if validate is true and end offset is greater than the + * high-watermark + * @throws IllegalArgumentException if validate is true and end offset is less than the log + * start offset */ -RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId); +Optional createSnapshot(OffsetAndEpoch snapshotId, boolean validate); Review comment: Done. Used these suggestions. I couldn't think of better names. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client
jsancio commented on a change in pull request #10786: URL: https://github.com/apache/kafka/pull/10786#discussion_r651387678 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ## @@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() { return snapshot.snapshotId(); } +/** + * Returns the last log offset which is represented in the snapshot. + */ +public long lastOffsetFromLog() { Review comment: I am up for that. Do you mind if I file a jira for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client
jsancio commented on a change in pull request #10786: URL: https://github.com/apache/kafka/pull/10786#discussion_r651387182 ## File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java ## @@ -437,14 +437,89 @@ public void testCreateSnapshot() throws IOException { appendBatch(numberOfRecords, epoch); log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); -try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId)) { +try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId, true).get()) { snapshot.freeze(); } RawSnapshotReader snapshot = log.readSnapshot(snapshotId).get(); assertEquals(0, snapshot.sizeInBytes()); } +@Test +public void testCreateSnapshotValidation() { +int numberOfRecords = 10; +int firstEpoch = 1; +int secondEpoch = 3; + +appendBatch(numberOfRecords, firstEpoch); +appendBatch(numberOfRecords, secondEpoch); +log.updateHighWatermark(new LogOffsetMetadata(2 * numberOfRecords)); + +// Test snapshot id for the first epoch +try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(numberOfRecords, firstEpoch), true).get()) { } +try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(numberOfRecords - 1, firstEpoch), true).get()) { } +try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(1, firstEpoch), true).get()) { } + +// Test snapshot id for the second epoch +try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(2 * numberOfRecords, secondEpoch), true).get()) { } +try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(2 * numberOfRecords - 1, secondEpoch), true).get()) { } +try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(numberOfRecords + 1, secondEpoch), true).get()) { } +} + +@Test +public void testCreateSnapshotLaterThanHighWatermark() { +int numberOfRecords = 10; +int epoch = 1; + +appendBatch(numberOfRecords, epoch); +log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + +assertThrows( +IllegalArgumentException.class, +() -> log.createSnapshot(new OffsetAndEpoch(numberOfRecords + 1, epoch), true) +); +} + +@Test +public void testCreateSnapshotBeforeLogStartOffset() { Review comment: Added few more tests one for much larger epoch, one for much smaller epoch and one for missing epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] IgnacioAcunaF commented on pull request #10883: KAFKA-12949: TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh
IgnacioAcunaF commented on pull request #10883: URL: https://github.com/apache/kafka/pull/10883#issuecomment-861104058 PING @ijuma (as I saw you on a PR related to test-kraft-server-start.sh [KAFKA-12672]) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client
hachikuji commented on a change in pull request #10786: URL: https://github.com/apache/kafka/pull/10786#discussion_r651378054 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ## @@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() { return snapshot.snapshotId(); } +/** + * Returns the last log offset which is represented in the snapshot. + */ +public long lastOffsetFromLog() { Review comment: Yeah, makes sense. I was sort of considering if it would be useful to have a `SnapshotId` object. Currently we use `OffsetAndEpoch` in other cases, but maybe a separate object would let us have better names. It would also let us define inclusive and exclusive methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client
jsancio commented on a change in pull request #10786: URL: https://github.com/apache/kafka/pull/10786#discussion_r651374491 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ## @@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() { return snapshot.snapshotId(); } +/** + * Returns the last log offset which is represented in the snapshot. + */ +public long lastOffsetFromLog() { Review comment: One of the users of this API had some confusion with this offset and the offsets return in the `Batch` by the `SnapshotReader` iterator. I wanted to make it clear that this offset and epoch refers to the offset and epoch found in the `ReplicatedLog` or `handleCommit`. While the offsets reported by the `Batch` for the `SnapshotReader` iterator are unrelated to the log's offsets. How about `lastContainedLogOffset` and `lastContainedLogEpoch`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] IgnacioAcunaF commented on pull request #10883: KAFKA-12949: TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh
IgnacioAcunaF commented on pull request #10883: URL: https://github.com/apache/kafka/pull/10883#issuecomment-861084792 With no change: ![image](https://user-images.githubusercontent.com/31544929/121976023-f49deb80-cd50-11eb-8fa7-94b7c4923751.png) With the change: ![image](https://user-images.githubusercontent.com/31544929/121975932-c5877a00-cd50-11eb-8a4a-bf930a1982dc.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API
mjsax commented on pull request #10861: URL: https://github.com/apache/kafka/pull/10861#issuecomment-861081018 Replied to comments and rebased to resolve merge conflicts. \cc @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API
mjsax commented on a change in pull request #10861: URL: https://github.com/apache/kafka/pull/10861#discussion_r651363243 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -82,20 +87,23 @@ private class KStreamKStreamJoinProcessor extends AbstractProcessor { private WindowStore otherWindowStore; -private StreamsMetricsImpl metrics; private Sensor droppedRecordsSensor; private Optional, LeftOrRightValue>> outerJoinWindowStore = Optional.empty(); -@SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); -metrics = (StreamsMetricsImpl) context.metrics(); +final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); otherWindowStore = context.getStateStore(otherWindowName); -if (StreamsConfig.InternalConfig.getBoolean(context().appConfigs(), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) { -outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name)); +if (enableSpuriousResultFix Review comment: I don't think that checking the condition twice is a real issue? Also, it seem to be better to do the check, because otherwise (if we might have a bug and incorrectly get a `null` store back) we might mask the bug? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API
mjsax commented on a change in pull request #10861: URL: https://github.com/apache/kafka/pull/10861#discussion_r651362625 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ## @@ -76,18 +76,37 @@ private final long graceMs; +protected final boolean enableSpuriousResultFix; + +protected JoinWindows(final JoinWindows joinWindows) { Review comment: Not really possible. Multiple issues: - we still need a protected constructor in `JoinWindows` - we need to make `graceMs` protected - blocker: we need to make the _public_ members `beforeMs` and `afterMs` non-final (and we cannot do this...) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] IgnacioAcunaF opened a new pull request #10883: KAFKA-12949: Add catch case to avoid scala.MatchError null
IgnacioAcunaF opened a new pull request #10883: URL: https://github.com/apache/kafka/pull/10883 Encounter the following exception when trying to run the TestRaftServer: `bin/test-kraft-server-start.sh --config config/kraft.properties` ``` [2021-06-14 17:15:43,232] ERROR [raft-workload-generator]: Error due to (kafka.tools.TestRaftServer$RaftWorkloadGenerator) scala.MatchError: null at kafka.tools.TestRaftServer$RaftWorkloadGenerator.doWork(TestRaftServer.scala:220) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) [2021-06-14 17:15:43,253] INFO [raft-workload-generator]: Stopped (kafka.tools.TestRaftServer$RaftWorkloadGenerator) ``` Caused because of a not contemplated *null* on eventQueue.poll: ``` eventQueue.poll(eventTimeoutMs, TimeUnit.MILLISECONDS) match { case HandleClaim(epoch) => claimedEpoch = Some(epoch) throttler.reset() pendingAppends.clear() recordCount.set(0) case HandleResign => claimedEpoch = None pendingAppends.clear() case HandleCommit(reader) => try { while (reader.hasNext) { val batch = reader.next() claimedEpoch.foreach { leaderEpoch => handleLeaderCommit(leaderEpoch, batch) } } } finally { reader.close() } case HandleSnapshot(reader) => // Ignore snapshots; only interested in records appended by this leader reader.close() case Shutdown => // Ignore shutdown command } ``` This makes raft-workload-generator's thread to stop. Proposal: Add a catch case on the match statement. `case _ => // Ignore other events (such as null)` ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YiDing-Duke commented on pull request #10843: MINOR: Log formatting for exceptions during configuration related operations
YiDing-Duke commented on pull request #10843: URL: https://github.com/apache/kafka/pull/10843#issuecomment-861078081 @dajac we are good to go. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YiDing-Duke commented on a change in pull request #10843: MINOR: Log formatting for exceptions during configuration related operations
YiDing-Duke commented on a change in pull request #10843: URL: https://github.com/apache/kafka/pull/10843#discussion_r651361653 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java ## @@ -174,11 +174,15 @@ public int hashCode() { return result; } +/** + * Override toString to redact sensitive value. + * WARNING, user should be responsible to set the correct "IsSensitive" field for each config entry. Review comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12949) TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh
[ https://issues.apache.org/jira/browse/KAFKA-12949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ignacio Acuna updated KAFKA-12949: -- Attachment: TestRaftServer.log > TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh > - > > Key: KAFKA-12949 > URL: https://issues.apache.org/jira/browse/KAFKA-12949 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: Ignacio Acuna >Assignee: Ignacio Acuna >Priority: Major > Attachments: TestRaftServer.log > > > Encounter the following exception when trying to run the TestRaftServer: > {code:java} > bin/test-kraft-server-start.sh --config config/kraft.properties{code} > {code:java} > [2021-06-14 17:15:43,232] ERROR [raft-workload-generator]: Error due to > (kafka.tools.TestRaftServer$RaftWorkloadGenerator) > scala.MatchError: null > at > kafka.tools.TestRaftServer$RaftWorkloadGenerator.doWork(TestRaftServer.scala:220) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > [2021-06-14 17:15:43,253] INFO [raft-workload-generator]: Stopped > (kafka.tools.TestRaftServer$RaftWorkloadGenerator){code} > That happens on the followin match: > {code:java} > eventQueue.poll(eventTimeoutMs, TimeUnit.MILLISECONDS) match { > case HandleClaim(epoch) => > claimedEpoch = Some(epoch) > throttler.reset() > pendingAppends.clear() > recordCount.set(0) > case HandleResign => > claimedEpoch = None > pendingAppends.clear()case HandleCommit(reader) => > try { > while (reader.hasNext) { > val batch = reader.next() > claimedEpoch.foreach { leaderEpoch => > handleLeaderCommit(leaderEpoch, batch) > } > } > } finally { > reader.close() > } > case HandleSnapshot(reader) => > // Ignore snapshots; only interested in records appended by this leader > reader.close() > case Shutdown => // Ignore shutdown command > } > {code} > Full log attached. When the eventQueue.poll returns null (if deque is empty), > there isn't a case to match so the thread gets stuck and stops processing > events (raft-workload-generator). > Proposal: > Add a case null to the match so the raft-workload-generator can continue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12949) TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh
Ignacio Acuna created KAFKA-12949: - Summary: TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh Key: KAFKA-12949 URL: https://issues.apache.org/jira/browse/KAFKA-12949 Project: Kafka Issue Type: Bug Components: kraft Reporter: Ignacio Acuna Assignee: Ignacio Acuna Encounter the following exception when trying to run the TestRaftServer: {code:java} bin/test-kraft-server-start.sh --config config/kraft.properties{code} {code:java} [2021-06-14 17:15:43,232] ERROR [raft-workload-generator]: Error due to (kafka.tools.TestRaftServer$RaftWorkloadGenerator) scala.MatchError: null at kafka.tools.TestRaftServer$RaftWorkloadGenerator.doWork(TestRaftServer.scala:220) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) [2021-06-14 17:15:43,253] INFO [raft-workload-generator]: Stopped (kafka.tools.TestRaftServer$RaftWorkloadGenerator){code} That happens on the followin match: {code:java} eventQueue.poll(eventTimeoutMs, TimeUnit.MILLISECONDS) match { case HandleClaim(epoch) => claimedEpoch = Some(epoch) throttler.reset() pendingAppends.clear() recordCount.set(0) case HandleResign => claimedEpoch = None pendingAppends.clear()case HandleCommit(reader) => try { while (reader.hasNext) { val batch = reader.next() claimedEpoch.foreach { leaderEpoch => handleLeaderCommit(leaderEpoch, batch) } } } finally { reader.close() } case HandleSnapshot(reader) => // Ignore snapshots; only interested in records appended by this leader reader.close() case Shutdown => // Ignore shutdown command } {code} Full log attached. When the eventQueue.poll returns null (if deque is empty), there isn't a case to match so the thread gets stuck and stops processing events (raft-workload-generator). Proposal: Add a case null to the match so the raft-workload-generator can continue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on a change in pull request #10843: MINOR: Log formatting for exceptions during configuration related operations
showuon commented on a change in pull request #10843: URL: https://github.com/apache/kafka/pull/10843#discussion_r651354743 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java ## @@ -174,11 +174,15 @@ public int hashCode() { return result; } +/** + * Override toString to redact sensitive value. + * WARNING, user should be responsible to set the correct "IsSensitive" field for each config entry. Review comment: Nit: `isSensitive` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10843: MINOR: Log formatting for exceptions during configuration related operations
showuon commented on a change in pull request #10843: URL: https://github.com/apache/kafka/pull/10843#discussion_r651354059 ## File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala ## @@ -469,7 +469,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } invalidProps.keys.foreach(props.remove) val configSource = if (perBrokerConfig) "broker" else "default cluster" -error(s"Dynamic $configSource config contains invalid values: $invalidProps, these configs will be ignored", e) +error(s"Dynamic $configSource config contains invalid values in: ${invalidProps.keys}, these configs will be ignored", e) Review comment: SGTM! Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
ijuma commented on pull request #10584: URL: https://github.com/apache/kafka/pull/10584#issuecomment-861064040 Yeah, that unintentional while using the GitHub editor. Will add it back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
jolshan commented on pull request #10584: URL: https://github.com/apache/kafka/pull/10584#issuecomment-861062736 Ah I see I didn't need that extra declaration but you also removed the comment. Not a huge deal though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
jolshan commented on a change in pull request #10584: URL: https://github.com/apache/kafka/pull/10584#discussion_r651337637 ## File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java ## @@ -117,13 +128,21 @@ public MetadataRequestData data() { public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { Errors error = Errors.forException(e); MetadataResponseData responseData = new MetadataResponseData(); -if (topics() != null) { -for (String topic : topics()) +if (data.topics() != null) { +for (MetadataRequestTopic topic : data.topics()) { +String topicName; +// If null, set to the empty string, since the response does not allow null. +if (topic.name() == null) +topicName = ""; +else +topicName = topic.name(); responseData.topics().add(new MetadataResponseData.MetadataResponseTopic() -.setName(topic) -.setErrorCode(error.code()) -.setIsInternal(false) -.setPartitions(Collections.emptyList())); +.setName(topicName) +.setTopicId(topic.topicId()) +.setErrorCode(error.code()) +.setIsInternal(false) +.setPartitions(Collections.emptyList())); Review comment: No. My IDE did that. I will revert that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r651336157 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ## @@ -119,18 +120,50 @@ public static FetchResponse parse(ByteBuffer buffer, short version) { return new FetchResponse(new FetchResponseData(new ByteBufferAccessor(buffer), version)); } +private LinkedHashMap toResponseDataMap(Map topicIdToNameMap, short version) { Review comment: I think this inconsistency existed before I touched the code. 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r651335666 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ## @@ -71,6 +76,12 @@ public FetchResponseData data() { return data; } +/** + * From version 3 or later, the authorized and existing entries in `FetchRequest.fetchData` should be in the same order in `responseData`. + * Version 13 introduces topic IDs which mean there may be unresolved partitions. If there is any unknown topic ID in the request, the + * response will contain a top-level UNKNOWN_TOPIC_ID error and UNKNOWN_TOPIC_ID errors on all the partitions. + * We may also return UNKNOWN_TOPIC_ID for a given partition when that partition in the session has a topic ID inconsistent with the broker. Review comment: I think I just have the wrong things here completely. There should be INCONSISTENT_TOPIC_ID here as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r651335255 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -283,14 +290,18 @@ public void onSuccess(ClientResponse resp) { fetchTarget.id()); return; } -if (!handler.handleResponse(response)) { +if (!handler.handleResponse(response, maxVersion)) { +if (response.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR || response.error() == Errors.UNKNOWN_TOPIC_ID) { +metadata.requestUpdate(); Review comment: This closes the session in handler.handlerResponse. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r651335121 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -283,14 +290,18 @@ public void onSuccess(ClientResponse resp) { fetchTarget.id()); return; } -if (!handler.handleResponse(response)) { +if (!handler.handleResponse(response, maxVersion)) { Review comment: I see what you mean. It is a little tricky to get the version from the FetchResponse itself. Would `resp.requestHeader().apiVersion()` work? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r651334106 ## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ## @@ -73,6 +77,22 @@ public FetchSessionHandler(LogContext logContext, int node) { private LinkedHashMap sessionPartitions = new LinkedHashMap<>(0); +/** + * All of the topic ids mapped to topic names for topics which exist in the fetch request session. + */ +private Map sessionTopicIds = new HashMap<>(0); + +/** + * All of the topic names mapped to topic ids for topics which exist in the fetch request session. + */ +private Map sessionTopicNames = new HashMap<>(0); + +public Map sessionTopicNames() { +return sessionTopicNames; +} + +private boolean canUseTopicIds = false; Review comment: I think we already do something like this on the broker. We only get to the point of having a session if the broker had an ID for all the topics in the request. I don't think we can calculate on a request basis since we may respond with topics that did not have IDs associated. I may be misunderstanding what you are saying, but I'm very wary of trying to switch between versions 12 and 13 in the same session. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r651333015 ## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ## @@ -213,9 +335,22 @@ public FetchRequestData build() { } sessionPartitions = next; next = null; +canUseTopicIds = topicIds.keySet().containsAll(sessionPartitions.keySet().stream().map( +tp -> tp.topic()).collect(Collectors.toSet())); +// Only add topic IDs to the session if we are using topic IDs. +if (canUseTopicIds) { Review comment: That makes sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client
hachikuji commented on a change in pull request #10786: URL: https://github.com/apache/kafka/pull/10786#discussion_r651330764 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ## @@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() { return snapshot.snapshotId(); } +/** + * Returns the last log offset which is represented in the snapshot. + */ +public long lastOffsetFromLog() { Review comment: Maybe something like `lastIncludedOffset` or `lastContainedOffset`? ## File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java ## @@ -437,14 +437,89 @@ public void testCreateSnapshot() throws IOException { appendBatch(numberOfRecords, epoch); log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); -try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId)) { +try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId, true).get()) { snapshot.freeze(); } RawSnapshotReader snapshot = log.readSnapshot(snapshotId).get(); assertEquals(0, snapshot.sizeInBytes()); } +@Test +public void testCreateSnapshotValidation() { +int numberOfRecords = 10; +int firstEpoch = 1; +int secondEpoch = 3; + +appendBatch(numberOfRecords, firstEpoch); +appendBatch(numberOfRecords, secondEpoch); +log.updateHighWatermark(new LogOffsetMetadata(2 * numberOfRecords)); + +// Test snapshot id for the first epoch +try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(numberOfRecords, firstEpoch), true).get()) { } +try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(numberOfRecords - 1, firstEpoch), true).get()) { } +try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(1, firstEpoch), true).get()) { } + +// Test snapshot id for the second epoch +try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(2 * numberOfRecords, secondEpoch), true).get()) { } +try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(2 * numberOfRecords - 1, secondEpoch), true).get()) { } +try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(numberOfRecords + 1, secondEpoch), true).get()) { } +} + +@Test +public void testCreateSnapshotLaterThanHighWatermark() { +int numberOfRecords = 10; +int epoch = 1; + +appendBatch(numberOfRecords, epoch); +log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + +assertThrows( +IllegalArgumentException.class, +() -> log.createSnapshot(new OffsetAndEpoch(numberOfRecords + 1, epoch), true) +); +} + +@Test +public void testCreateSnapshotBeforeLogStartOffset() { Review comment: Worth adding any test cases for an invalid epoch? ## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ## @@ -180,14 +181,17 @@ default void beginShutdown() {} void resign(int epoch); /** - * Create a writable snapshot file for a given offset and epoch. + * Create a writable snapshot file for a commmitted offset. Review comment: nit: one extra 'm' ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1101,7 +1101,7 @@ private boolean handleFetchResponse( partitionResponse.snapshotId().epoch() ); - state.setFetchingSnapshot(Optional.of(log.createSnapshot(snapshotId))); +state.setFetchingSnapshot(log.createSnapshot(snapshotId, false)); Review comment: Might be worth a brief comment that the snapshot is expected to be well ahead of the current log, so we have to skip validation. ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java ## @@ -230,12 +230,18 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) { * Create a writable snapshot for the given snapshot id. * * See {@link RawSnapshotWriter} for details on how to use this object. The caller of - * this method is responsible for invoking {@link RawSnapshotWriter#close()}. + * this method is responsible for invoking {@link RawSnapshotWriter#close()}. If a + * snapshot already exists then return an {@link Optional#empty()}. * * @param snapshotId the end offset and epoch that identifies the snapshot - * @return a writable snapshot + * @param validate validate the snapshot id against the log + * @return a writable snapshot if it doesn't already exists + * @throws IllegalArgumentException if validate is true and end offset is greater than the + * high-watermark + * @throws IllegalArgumentException if validate is true and end offset is
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r651332908 ## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ## @@ -186,23 +268,63 @@ public String toString() { * incremental fetch requests (see below). */ private LinkedHashMap next; +private Map topicIds; private final boolean copySessionPartitions; +private boolean missingTopicIds; Builder() { this.next = new LinkedHashMap<>(); +this.topicIds = new HashMap<>(); this.copySessionPartitions = true; } Builder(int initialSize, boolean copySessionPartitions) { this.next = new LinkedHashMap<>(initialSize); +this.topicIds = new HashMap<>(initialSize); this.copySessionPartitions = copySessionPartitions; } /** * Mark that we want data from this partition in the upcoming fetch. */ -public void add(TopicPartition topicPartition, PartitionData data) { +public void add(TopicPartition topicPartition, Uuid topicId, PartitionData data) { next.put(topicPartition, data); +// topicIds do not change between adding partitions and building, so we can use putIfAbsent +if (!topicId.equals(Uuid.ZERO_UUID)) { +topicIds.putIfAbsent(topicPartition.topic(), topicId); Review comment: Ah I see what you are saying here. I think this will still close the session when we send the request. The other option is to set a boolean similar to `missingTopicId` (maybe just change to `inconsistentTopicId` that signals to close the session earlier (upon build) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r651332118 ## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ## @@ -186,23 +268,63 @@ public String toString() { * incremental fetch requests (see below). */ private LinkedHashMap next; +private Map topicIds; private final boolean copySessionPartitions; +private boolean missingTopicIds; Builder() { this.next = new LinkedHashMap<>(); +this.topicIds = new HashMap<>(); this.copySessionPartitions = true; } Builder(int initialSize, boolean copySessionPartitions) { this.next = new LinkedHashMap<>(initialSize); +this.topicIds = new HashMap<>(initialSize); this.copySessionPartitions = copySessionPartitions; } /** * Mark that we want data from this partition in the upcoming fetch. */ -public void add(TopicPartition topicPartition, PartitionData data) { +public void add(TopicPartition topicPartition, Uuid topicId, PartitionData data) { next.put(topicPartition, data); +// topicIds do not change between adding partitions and building, so we can use putIfAbsent +if (!topicId.equals(Uuid.ZERO_UUID)) { +topicIds.putIfAbsent(topicPartition.topic(), topicId); Review comment: If we try to put in a new topic ID, the session should be closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r651331636 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -412,6 +412,12 @@ abstract class AbstractFetcherThread(name: String, "expected to persist.") partitionsWithError += topicPartition +case Errors.INCONSISTENT_TOPIC_ID => Review comment: This is no longer a partition level error. We can only get it as a top level error. If it is a top level error, I believe we return an empty map and do not go down this code path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r651331247 ## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ## @@ -216,12 +217,21 @@ class ReplicaFetcherThread(name: String, try { val clientResponse = leaderEndpoint.sendRequest(fetchRequest) val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse] - if (!fetchSessionHandler.handleResponse(fetchResponse)) { -Map.empty + if (!fetchSessionHandler.handleResponse(fetchResponse, clientResponse.requestHeader().apiVersion())) { +if (fetchResponse.error() == Errors.UNKNOWN_TOPIC_ID) + throw new UnknownTopicIdException("There was a topic ID in the request that was unknown to the server.") +else if (fetchResponse.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR) + throw new FetchSessionTopicIdException("There was a topic ID in the request that was inconsistent with the session.") +else + Map.empty } else { -fetchResponse.responseData.asScala +fetchResponse.responseData(fetchSessionHandler.sessionTopicNames, clientResponse.requestHeader().apiVersion()).asScala } } catch { + case unknownId: UnknownTopicIdException => +throw unknownId + case sessionUnknownId: FetchSessionTopicIdException => +throw sessionUnknownId Review comment: This happens inside of `fetchSessionHandler.handleResponse`. We set the session to close upon the next request. The code path for Fetcher is slightly different so it made sense for that code to have it there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r651329577 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -945,10 +945,12 @@ private boolean hasValidClusterId(String requestClusterId) { return completedFuture(new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())); } -if (!hasValidTopicPartition(request, log.topicPartition())) { +if (!hasValidTopicPartition(request, log.topicPartition(), log.topicId())) { Review comment: Are you referring to creating a new topic ID for the metadata topic? For now, we are simply using the sentinel ID. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r651323500 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -1063,6 +1076,104 @@ public int generation() { return generation; } +@Override +public void restartConnectorAndTasks( +RestartRequest request, +Callback callback +) { +final String connectorName = request.connectorName(); +addRequest( +() -> { +if (checkRebalanceNeeded(callback)) { +return null; +} +if (!configState.connectors().contains(request.connectorName())) { +callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); +return null; +} +if (isLeader()) { +// Write a restart request to the config backing store, to be executed asynchronously in tick() +configBackingStore.putRestartRequest(request); +// Compute and send the response that this was accepted +Optional maybePlan = buildRestartPlanFor(request); +if (!maybePlan.isPresent()) { +callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); +} else { +RestartPlan plan = maybePlan.get(); +callback.onCompletion(null, plan.restartConnectorStateInfo()); +} +} else { +callback.onCompletion(new NotLeaderException("Cannot process restart request since it is not assigned to this member", leaderUrl()), null); +} + +return null; +}, +forwardErrorCallback(callback) +); +} + +/** + * Process all pending restart requests. There can be at most one request per connector, because of how + * {@link RestartRequest#equals(Object)} and {@link RestartRequest#hashCode()} are based only on the connector name. + * + * This method is called from within the {@link #tick()} method. It is synchronized so that all pending restart requests + * are processed at once before any additional requests are added. + */ +private synchronized void processRestartRequests() { Review comment: new implementation with iterator makes this explicit. Could you please check to see if it looks good now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r651323170 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -186,6 +192,10 @@ private short currentProtocolVersion; private short backoffRetries; +// visible for testing +// The pending restart requests for the connectors; +final NavigableSet pendingRestartRequests = new TreeSet<>(); Review comment: Fixed, Could you please check to see if it looks good now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10863: KAFKA-12890; Consumer group stuck in `CompletingRebalance`
hachikuji commented on a change in pull request #10863: URL: https://github.com/apache/kafka/pull/10863#discussion_r651224002 ## File path: core/src/main/scala/kafka/coordinator/group/DelayedSync.scala ## @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator.group + +import kafka.server.DelayedOperation + +/** + * Delayed rebalance operation that is added to the purgatory when is completing the rebalance. + * + * Whenever a SyncGroup is receives, checks that we received all the SyncGroup request from Review comment: nit: Whenever a SyncGroup is receive**d**? ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1450,12 +1457,89 @@ class GroupCoordinator(val brokerId: Int, group.maybeInvokeJoinCallback(member, joinResult) completeAndScheduleNextHeartbeatExpiration(group, member) member.isNew = false + +group.addPendingSyncMember(member.memberId) } + + schedulePendingSync(group) } } } } + private def maybeRemovePendingSyncMember( Review comment: nit: `removePendingSyncMember` throws an exception if the member is not in the group, so does the "maybe" in the name make sense? ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1450,12 +1457,89 @@ class GroupCoordinator(val brokerId: Int, group.maybeInvokeJoinCallback(member, joinResult) completeAndScheduleNextHeartbeatExpiration(group, member) member.isNew = false + +group.addPendingSyncMember(member.memberId) } + + schedulePendingSync(group) } } } } + private def maybeRemovePendingSyncMember( +group: GroupMetadata, +memberId: String + ): Unit = { +group.removePendingSyncMember(memberId) +maybeCompleteSyncExpiration(group) + } + + private def removeSyncExpiration( +group: GroupMetadata + ): Unit = { +group.clearPendingSyncMembers() +maybeCompleteSyncExpiration(group) + } + + private def maybeCompleteSyncExpiration( +group: GroupMetadata + ): Unit = { +val groupKey = GroupKey(group.groupId) +syncPurgatory.checkAndComplete(groupKey) + } + + private def schedulePendingSync( +group: GroupMetadata + ): Unit = { +val delayedSync = new DelayedSync(this, group, group.rebalanceTimeoutMs) +val groupKey = GroupKey(group.groupId) +syncPurgatory.tryCompleteElseWatch(delayedSync, Seq(groupKey)) + } + + def tryCompletePendingSync( +group: GroupMetadata, +forceComplete: () => Boolean + ): Boolean = { +group.inLock { + group.currentState match { +case Dead | Empty | PreparingRebalance => + forceComplete() +case CompletingRebalance | Stable => + if (group.hasReceivedSyncFromAllMembers()) +forceComplete() + else false + } +} + } + + def onExpirePendingSync( +group: GroupMetadata + ): Unit = { +group.inLock { + group.currentState match { +case Dead | Empty | PreparingRebalance => + debug(s"Received unexpected notification of sync expiration after group ${group.groupId} " + +s"already transitioned to the ${group.currentState} state.") + +case CompletingRebalance | Stable => + if (!group.hasAllMembersJoined) { +val pendingSyncMembers = group.allPendingSyncMembers() + +info(s"Group ${group.groupId} removed members who haven't " + Review comment: nit: this message might be redundant given the one in `prepareRebalance`. ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1450,12 +1457,89 @@ class GroupCoordinator(val brokerId: Int, group.maybeInvokeJoinCallback(member, joinResult) completeAndScheduleNextHeartbeatExpiration(group, member) member.isNew = false + +group.addPendingSyncMember(member.memberId) } + + schedulePendingSync(group) } } } } + private def maybeRemove
[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
junrao commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r649585182 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -817,20 +839,26 @@ class KafkaApis(val requestChannel: RequestChannel, def createResponse(throttleTimeMs: Int): FetchResponse = { // Down-convert messages for each partition if required val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] -unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) => - val error = Errors.forCode(unconvertedPartitionData.errorCode) - if (error != Errors.NONE) -debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " + - s"on partition $tp failed due to ${error.exceptionName}") - convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData)) +unconvertedFetchResponse.data().responses().forEach { topicResponse => + topicResponse.partitions().forEach{ unconvertedPartitionData => Review comment: Space after forEach. ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -593,19 +619,22 @@ class FetchSessionCache(private val maxEntries: Int, * @param nowThe current time in milliseconds. * @param privileged True if the new entry we are trying to create is privileged. * @param size The number of cached partitions in the new entry we are trying to create. -* @param createPartitions A callback function which creates the map of cached partitions. +* @param versionThe version of the request +* @param createPartitions A callback function which creates the map of cached partitions and the mapping from +* topic name for topic ID for the topics. Review comment: from topic name for topic ID => from topic name to topic ID ? ## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ## @@ -213,9 +335,22 @@ public FetchRequestData build() { } sessionPartitions = next; next = null; +canUseTopicIds = topicIds.keySet().containsAll(sessionPartitions.keySet().stream().map( +tp -> tp.topic()).collect(Collectors.toSet())); +// Only add topic IDs to the session if we are using topic IDs. +if (canUseTopicIds) { Review comment: Should we set sessionTopicIds and sessionTopicNames to empty map if canUseTopicIds is false? ## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ## @@ -216,12 +217,21 @@ class ReplicaFetcherThread(name: String, try { val clientResponse = leaderEndpoint.sendRequest(fetchRequest) val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse] - if (!fetchSessionHandler.handleResponse(fetchResponse)) { -Map.empty + if (!fetchSessionHandler.handleResponse(fetchResponse, clientResponse.requestHeader().apiVersion())) { +if (fetchResponse.error() == Errors.UNKNOWN_TOPIC_ID) + throw new UnknownTopicIdException("There was a topic ID in the request that was unknown to the server.") +else if (fetchResponse.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR) + throw new FetchSessionTopicIdException("There was a topic ID in the request that was inconsistent with the session.") +else + Map.empty } else { -fetchResponse.responseData.asScala +fetchResponse.responseData(fetchSessionHandler.sessionTopicNames, clientResponse.requestHeader().apiVersion()).asScala } } catch { + case unknownId: UnknownTopicIdException => +throw unknownId + case sessionUnknownId: FetchSessionTopicIdException => +throw sessionUnknownId Review comment: If we get FetchSessionTopicIdException, the existing session is going to be invalid. So, it seems that we should start a new session? The same thing seems to apply to UnknownTopicIdException ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ## @@ -80,19 +91,9 @@ public Errors error() { return Errors.forCode(data.errorCode()); } -public LinkedHashMap responseData() { -if (responseData == null) { -synchronized (this) { -if (responseData == null) { -responseData = new LinkedHashMap<>(); -data.responses().forEach(topicResponse -> -topicResponse.partitions().forEach(partition -> -responseData.put(new TopicPartition(topicResponse.topic(), partition.partitionIndex()), partition)) -);
[GitHub] [kafka] cmccabe commented on pull request #10804: KAFKA-12877: Make flexibleVersions mandatory
cmccabe commented on pull request #10804: URL: https://github.com/apache/kafka/pull/10804#issuecomment-861030811 Seems like there was a Jenkins infra issue on the last test run. ``` [2021-06-14T20:15:25.946Z] Execution failed for task ':streams:streams-scala:compileScala'. [2021-06-14T20:15:25.946Z] > Timeout waiting to lock zinc-1.3.5_2.13.6_16 compiler cache (/home/jenkins/.gradle/caches/7.0.2/zinc-1.3.5_2.13.6_16). It is currently in use by another Gradle instance. [2021-06-14T20:15:25.946Z] Owner PID: 36970 [2021-06-14T20:15:25.946Z] Our PID: 37256 [2021-06-14T20:15:25.946Z] Owner Operation: [2021-06-14T20:15:25.946Z] Our operation: [2021-06-14T20:15:25.946Z] Lock file: /home/jenkins/.gradle/caches/7.0.2/zinc-1.3.5_2.13.6_16/zinc-1.3.5_2.13.6_16.lock ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10867: KAFKA-12931: KIP-746: Revise KRaft Metadata Records
cmccabe commented on pull request #10867: URL: https://github.com/apache/kafka/pull/10867#issuecomment-861030196 Test failure is `org.apache.kafka.streams.integration.TaskMetadataIntegrationTest`, which is not related -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r65129 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -1063,6 +1076,104 @@ public int generation() { return generation; } +@Override +public void restartConnectorAndTasks( +RestartRequest request, +Callback callback +) { +final String connectorName = request.connectorName(); +addRequest( +() -> { +if (checkRebalanceNeeded(callback)) { +return null; +} +if (!configState.connectors().contains(request.connectorName())) { +callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); +return null; +} +if (isLeader()) { +// Write a restart request to the config backing store, to be executed asynchronously in tick() +configBackingStore.putRestartRequest(request); +// Compute and send the response that this was accepted +Optional maybePlan = buildRestartPlanFor(request); +if (!maybePlan.isPresent()) { +callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); +} else { +RestartPlan plan = maybePlan.get(); +callback.onCompletion(null, plan.restartConnectorStateInfo()); +} +} else { +callback.onCompletion(new NotLeaderException("Cannot process restart request since it is not assigned to this member", leaderUrl()), null); +} + +return null; +}, +forwardErrorCallback(callback) +); +} + +/** + * Process all pending restart requests. There can be at most one request per connector, because of how + * {@link RestartRequest#equals(Object)} and {@link RestartRequest#hashCode()} are based only on the connector name. + * + * This method is called from within the {@link #tick()} method. It is synchronized so that all pending restart requests + * are processed at once before any additional requests are added. + */ +private synchronized void processRestartRequests() { Review comment: We can't make a copy because we are doing pollFirst and its removing it out of set `while ((request = pendingRestartRequests.pollFirst()) != null) { ` The whole method synchronization was done deliberately to keep the code simple. One more point is that this just triggers the start of connector/tasks and real start happens in another thread so this should be pretty fast. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
ijuma commented on a change in pull request #10584: URL: https://github.com/apache/kafka/pull/10584#discussion_r651308552 ## File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java ## @@ -117,13 +128,21 @@ public MetadataRequestData data() { public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { Errors error = Errors.forException(e); MetadataResponseData responseData = new MetadataResponseData(); -if (topics() != null) { -for (String topic : topics()) +if (data.topics() != null) { +for (MetadataRequestTopic topic : data.topics()) { +String topicName; +// If null, set to the empty string, since the response does not allow null. +if (topic.name() == null) +topicName = ""; +else +topicName = topic.name(); Review comment: Can we use the ternary operator here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
ijuma commented on a change in pull request #10584: URL: https://github.com/apache/kafka/pull/10584#discussion_r651308210 ## File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java ## @@ -117,13 +128,21 @@ public MetadataRequestData data() { public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { Errors error = Errors.forException(e); MetadataResponseData responseData = new MetadataResponseData(); -if (topics() != null) { -for (String topic : topics()) +if (data.topics() != null) { +for (MetadataRequestTopic topic : data.topics()) { +String topicName; +// If null, set to the empty string, since the response does not allow null. +if (topic.name() == null) +topicName = ""; +else +topicName = topic.name(); responseData.topics().add(new MetadataResponseData.MetadataResponseTopic() -.setName(topic) -.setErrorCode(error.code()) -.setIsInternal(false) -.setPartitions(Collections.emptyList())); +.setName(topicName) +.setTopicId(topic.topicId()) +.setErrorCode(error.code()) +.setIsInternal(false) +.setPartitions(Collections.emptyList())); Review comment: Is there a reason to change the indent? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12379) KIP-716: Allow configuring the location of the offsetsync topic with MirrorMaker2
[ https://issues.apache.org/jira/browse/KAFKA-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-12379: --- Fix Version/s: 3.0.0 > KIP-716: Allow configuring the location of the offsetsync topic with > MirrorMaker2 > - > > Key: KAFKA-12379 > URL: https://issues.apache.org/jira/browse/KAFKA-12379 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > Fix For: 3.0.0 > > > Ticket for KIP-716 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-716%3A+Allow+configuring+the+location+of+the+offset-syncs+topic+with+MirrorMaker2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12379) KIP-716: Allow configuring the location of the offsetsync topic with MirrorMaker2
[ https://issues.apache.org/jira/browse/KAFKA-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-12379: --- Labels: needs-kip (was: ) > KIP-716: Allow configuring the location of the offsetsync topic with > MirrorMaker2 > - > > Key: KAFKA-12379 > URL: https://issues.apache.org/jira/browse/KAFKA-12379 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > Labels: needs-kip > Fix For: 3.0.0 > > > Ticket for KIP-716 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-716%3A+Allow+configuring+the+location+of+the+offset-syncs+topic+with+MirrorMaker2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r651300080 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java ## @@ -255,12 +257,29 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto @POST @Path("/{connector}/restart") -public void restartConnector(final @PathParam("connector") String connector, +public Response restartConnector(final @PathParam("connector") String connector, final @Context HttpHeaders headers, + final @DefaultValue ("false") @QueryParam("includeTasks") Boolean includeTasks, + final @DefaultValue ("false") @QueryParam("onlyFailed") Boolean onlyFailed, final @QueryParam("forward") Boolean forward) throws Throwable { -FutureCallback cb = new FutureCallback<>(); -herder.restartConnector(connector, cb); -completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", headers, null, forward); +RestartRequest restartRequest = new RestartRequest(connector, onlyFailed, includeTasks); +if (restartRequest.forciblyRestartConnectorOnly()) { +// For backward compatibility, just restart the connector instance and return OK with no body +FutureCallback cb = new FutureCallback<>(); +herder.restartConnector(connector, cb); +completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", headers, null, forward); +return Response.ok().build(); +} + +FutureCallback cb = new FutureCallback<>(); +herder.restartConnectorAndTasks(restartRequest, cb); +Map queryParameters = new HashMap<>(); +queryParameters.put("includeTasks", String.valueOf(includeTasks)); +queryParameters.put("onlyFailed", String.valueOf(onlyFailed)); +String forwardingPath = "/connectors/" + connector + "/restart"; Review comment: Good idea, Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r651293447 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -1063,6 +1076,104 @@ public int generation() { return generation; } +@Override +public void restartConnectorAndTasks( +RestartRequest request, +Callback callback +) { +final String connectorName = request.connectorName(); +addRequest( +() -> { +if (checkRebalanceNeeded(callback)) { +return null; +} +if (!configState.connectors().contains(request.connectorName())) { +callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); +return null; +} +if (isLeader()) { +// Write a restart request to the config backing store, to be executed asynchronously in tick() +configBackingStore.putRestartRequest(request); +// Compute and send the response that this was accepted +Optional maybePlan = buildRestartPlanFor(request); +if (!maybePlan.isPresent()) { +callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); +} else { +RestartPlan plan = maybePlan.get(); +callback.onCompletion(null, plan.restartConnectorStateInfo()); +} +} else { +callback.onCompletion(new NotLeaderException("Cannot process restart request since it is not assigned to this member", leaderUrl()), null); +} + +return null; +}, +forwardErrorCallback(callback) +); +} + +/** + * Process all pending restart requests. There can be at most one request per connector, because of how + * {@link RestartRequest#equals(Object)} and {@link RestartRequest#hashCode()} are based only on the connector name. + * + * This method is called from within the {@link #tick()} method. It is synchronized so that all pending restart requests + * are processed at once before any additional requests are added. + */ +private synchronized void processRestartRequests() { +RestartRequest request; +while ((request = pendingRestartRequests.pollFirst()) != null) { +doRestartConnectorAndTasks(request); +} +} + +protected synchronized boolean doRestartConnectorAndTasks(RestartRequest request) { +final String connectorName = request.connectorName(); +Optional maybePlan = buildRestartPlanFor(request); +if (!maybePlan.isPresent()) { +log.debug("Skipping restart of connector '{}' since no status is available: {}", connectorName, request); +return false; +} +RestartPlan plan = maybePlan.get(); +log.info("Executing {}", plan); + + +// If requested, stop the connector and any tasks, marking each as restarting +final ExtendedAssignment currentAssignments = assignment; +final Collection assignedIdsToRestart = plan.taskIdsToRestart() + .stream() + .filter(taskId -> currentAssignments.tasks().contains(taskId)) + .collect(Collectors.toList()); +final boolean restartConnector = plan.restartConnector() && currentAssignments.connectors().contains(connectorName); +final boolean restartTasks = !assignedIdsToRestart.isEmpty(); +if (restartConnector) { +worker.stopAndAwaitConnector(connectorName); +recordRestarting(connectorName); +} +if (restartTasks) { +// Stop the tasks and mark as restarting +worker.stopAndAwaitTasks(assignedIdsToRestart); +assignedIdsToRestart.forEach(this::recordRestarting); +} + +// Now restart the connector and tasks +if (restartConnector) { +startConnector(connectorName, (error, targetState) -> { +if (error == null) { +log.info("Connector {} successfully restarted", connectorName); +} else { +log.error("Failed to restart connector '" + connectorName + "'", error); +} +}); +} +if (restartTasks) { +log.debug("Restarting {} of {} tasks for {}", plan.restartTaskCount
[jira] [Commented] (KAFKA-12948) NetworkClient.close(node) with node in connecting state makes NetworkClient unusable
[ https://issues.apache.org/jira/browse/KAFKA-12948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363227#comment-17363227 ] Ismael Juma commented on KAFKA-12948: - Good catch. So, this regressed in 2.7.0? > NetworkClient.close(node) with node in connecting state makes NetworkClient > unusable > > > Key: KAFKA-12948 > URL: https://issues.apache.org/jira/browse/KAFKA-12948 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 2.8.0, 2.7.1 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 2.7.2, 2.8.1 > > > `NetworkClient.close(node)` closes the node and removes it from > `ClusterConnectionStates.nodeState`, but not from > `ClusterConnectionStates.connectingNodes`. Subsequent `NetworkClient.poll()` > invocations throw IllegalStateException and this leaves the NetworkClient in > an unusable state until the node is removed from connectionNodes or added to > nodeState. We don't use `NetworkClient.close(node)` in clients, but we use it > in clients started by brokers for replica fetcher and controller. Since > brokers use NetworkClientUtils.isReady() before establishing connections and > this invokes poll(), the NetworkClient never recovers. > Exception stack trace: > {code:java} > java.lang.IllegalStateException: No entry found for connection 0 > at > org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:409) > at > org.apache.kafka.clients.ClusterConnectionStates.isConnectionSetupTimeout(ClusterConnectionStates.java:446) > at > org.apache.kafka.clients.ClusterConnectionStates.lambda$nodesWithConnectionSetupTimeout$0(ClusterConnectionStates.java:458) > at > java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174) > at > java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553) > at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > org.apache.kafka.clients.ClusterConnectionStates.nodesWithConnectionSetupTimeout(ClusterConnectionStates.java:459) > at > org.apache.kafka.clients.NetworkClient.handleTimedOutConnections(NetworkClient.java:807) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) > at > org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:42) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r651298044 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -186,6 +192,10 @@ private short currentProtocolVersion; private short backoffRetries; +// visible for testing +// The pending restart requests for the connectors; +final NavigableSet pendingRestartRequests = new TreeSet<>(); Review comment: no particular reason, TreeSet was a NavigableSet implementation that came to my mind. But I like your above idea of using a map and simplifying the code, Let me work on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r65129 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -1063,6 +1076,104 @@ public int generation() { return generation; } +@Override +public void restartConnectorAndTasks( +RestartRequest request, +Callback callback +) { +final String connectorName = request.connectorName(); +addRequest( +() -> { +if (checkRebalanceNeeded(callback)) { +return null; +} +if (!configState.connectors().contains(request.connectorName())) { +callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); +return null; +} +if (isLeader()) { +// Write a restart request to the config backing store, to be executed asynchronously in tick() +configBackingStore.putRestartRequest(request); +// Compute and send the response that this was accepted +Optional maybePlan = buildRestartPlanFor(request); +if (!maybePlan.isPresent()) { +callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); +} else { +RestartPlan plan = maybePlan.get(); +callback.onCompletion(null, plan.restartConnectorStateInfo()); +} +} else { +callback.onCompletion(new NotLeaderException("Cannot process restart request since it is not assigned to this member", leaderUrl()), null); +} + +return null; +}, +forwardErrorCallback(callback) +); +} + +/** + * Process all pending restart requests. There can be at most one request per connector, because of how + * {@link RestartRequest#equals(Object)} and {@link RestartRequest#hashCode()} are based only on the connector name. + * + * This method is called from within the {@link #tick()} method. It is synchronized so that all pending restart requests + * are processed at once before any additional requests are added. + */ +private synchronized void processRestartRequests() { Review comment: We can't make a copy because we are doing pollFirst and its removing it out of set `while ((request = pendingRestartRequests.pollFirst()) != null) { ` The whole method synchronization was done deliberately to keep the code simple. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r65129 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -1063,6 +1076,104 @@ public int generation() { return generation; } +@Override +public void restartConnectorAndTasks( +RestartRequest request, +Callback callback +) { +final String connectorName = request.connectorName(); +addRequest( +() -> { +if (checkRebalanceNeeded(callback)) { +return null; +} +if (!configState.connectors().contains(request.connectorName())) { +callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); +return null; +} +if (isLeader()) { +// Write a restart request to the config backing store, to be executed asynchronously in tick() +configBackingStore.putRestartRequest(request); +// Compute and send the response that this was accepted +Optional maybePlan = buildRestartPlanFor(request); +if (!maybePlan.isPresent()) { +callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); +} else { +RestartPlan plan = maybePlan.get(); +callback.onCompletion(null, plan.restartConnectorStateInfo()); +} +} else { +callback.onCompletion(new NotLeaderException("Cannot process restart request since it is not assigned to this member", leaderUrl()), null); +} + +return null; +}, +forwardErrorCallback(callback) +); +} + +/** + * Process all pending restart requests. There can be at most one request per connector, because of how + * {@link RestartRequest#equals(Object)} and {@link RestartRequest#hashCode()} are based only on the connector name. + * + * This method is called from within the {@link #tick()} method. It is synchronized so that all pending restart requests + * are processed at once before any additional requests are added. + */ +private synchronized void processRestartRequests() { Review comment: We can't make a copy because we are doing pollFirst and its removing it out of list `while ((request = pendingRestartRequests.pollFirst()) != null) { ` The whole method synchronization was done deliberately to keep the code simple. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r651293447 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -1063,6 +1076,104 @@ public int generation() { return generation; } +@Override +public void restartConnectorAndTasks( +RestartRequest request, +Callback callback +) { +final String connectorName = request.connectorName(); +addRequest( +() -> { +if (checkRebalanceNeeded(callback)) { +return null; +} +if (!configState.connectors().contains(request.connectorName())) { +callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); +return null; +} +if (isLeader()) { +// Write a restart request to the config backing store, to be executed asynchronously in tick() +configBackingStore.putRestartRequest(request); +// Compute and send the response that this was accepted +Optional maybePlan = buildRestartPlanFor(request); +if (!maybePlan.isPresent()) { +callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); +} else { +RestartPlan plan = maybePlan.get(); +callback.onCompletion(null, plan.restartConnectorStateInfo()); +} +} else { +callback.onCompletion(new NotLeaderException("Cannot process restart request since it is not assigned to this member", leaderUrl()), null); +} + +return null; +}, +forwardErrorCallback(callback) +); +} + +/** + * Process all pending restart requests. There can be at most one request per connector, because of how + * {@link RestartRequest#equals(Object)} and {@link RestartRequest#hashCode()} are based only on the connector name. + * + * This method is called from within the {@link #tick()} method. It is synchronized so that all pending restart requests + * are processed at once before any additional requests are added. + */ +private synchronized void processRestartRequests() { +RestartRequest request; +while ((request = pendingRestartRequests.pollFirst()) != null) { +doRestartConnectorAndTasks(request); +} +} + +protected synchronized boolean doRestartConnectorAndTasks(RestartRequest request) { +final String connectorName = request.connectorName(); +Optional maybePlan = buildRestartPlanFor(request); +if (!maybePlan.isPresent()) { +log.debug("Skipping restart of connector '{}' since no status is available: {}", connectorName, request); +return false; +} +RestartPlan plan = maybePlan.get(); +log.info("Executing {}", plan); + + +// If requested, stop the connector and any tasks, marking each as restarting +final ExtendedAssignment currentAssignments = assignment; +final Collection assignedIdsToRestart = plan.taskIdsToRestart() + .stream() + .filter(taskId -> currentAssignments.tasks().contains(taskId)) + .collect(Collectors.toList()); +final boolean restartConnector = plan.restartConnector() && currentAssignments.connectors().contains(connectorName); +final boolean restartTasks = !assignedIdsToRestart.isEmpty(); +if (restartConnector) { +worker.stopAndAwaitConnector(connectorName); +recordRestarting(connectorName); +} +if (restartTasks) { +// Stop the tasks and mark as restarting +worker.stopAndAwaitTasks(assignedIdsToRestart); +assignedIdsToRestart.forEach(this::recordRestarting); +} + +// Now restart the connector and tasks +if (restartConnector) { +startConnector(connectorName, (error, targetState) -> { +if (error == null) { +log.info("Connector {} successfully restarted", connectorName); +} else { +log.error("Failed to restart connector '" + connectorName + "'", error); +} +}); +} +if (restartTasks) { +log.debug("Restarting {} of {} tasks for {}", plan.restartTaskCount
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r651292365 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ## @@ -357,6 +367,62 @@ public void validateConnectorConfig(Map connectorProps, Callback }); } +/** + * Build the {@link RestartPlan} that describes what should and should not be restarted given the restart request + * and the current status of the connector and task instances. + * + * @param request the restart request; may not be null + * @return the restart plan, or empty this worker has no status for the connector named in the request and therefore the Review comment: Fixed ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ## @@ -357,6 +367,62 @@ public void validateConnectorConfig(Map connectorProps, Callback }); } +/** + * Build the {@link RestartPlan} that describes what should and should not be restarted given the restart request + * and the current status of the connector and task instances. + * + * @param request the restart request; may not be null + * @return the restart plan, or empty this worker has no status for the connector named in the request and therefore the + * connector cannot be restarted + */ +public Optional buildRestartPlanFor(RestartRequest request) { +String connectorName = request.connectorName(); +ConnectorStatus connectorStatus = statusBackingStore.get(connectorName); +if (connectorStatus == null) { +return Optional.empty(); +} + +// If requested, mark the connector as restarting +AbstractStatus.State connectorState; +if (request.includeConnector(connectorStatus)) { +connectorState = AbstractStatus.State.RESTARTING; +} else { +connectorState = connectorStatus.state(); +} +ConnectorStateInfo.ConnectorState connectorInfoState = new ConnectorStateInfo.ConnectorState( +connectorState.toString(), +connectorStatus.workerId(), +connectorStatus.trace() +); + +// Collect the task IDs to stop and restart (may be none) Review comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r651291878 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -1063,6 +1076,104 @@ public int generation() { return generation; } +@Override +public void restartConnectorAndTasks( +RestartRequest request, +Callback callback +) { +final String connectorName = request.connectorName(); +addRequest( +() -> { +if (checkRebalanceNeeded(callback)) { +return null; +} +if (!configState.connectors().contains(request.connectorName())) { +callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); +return null; +} +if (isLeader()) { +// Write a restart request to the config backing store, to be executed asynchronously in tick() +configBackingStore.putRestartRequest(request); +// Compute and send the response that this was accepted +Optional maybePlan = buildRestartPlanFor(request); +if (!maybePlan.isPresent()) { +callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); +} else { +RestartPlan plan = maybePlan.get(); +callback.onCompletion(null, plan.restartConnectorStateInfo()); +} +} else { +callback.onCompletion(new NotLeaderException("Cannot process restart request since it is not assigned to this member", leaderUrl()), null); Review comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r651291754 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -1063,6 +1076,104 @@ public int generation() { return generation; } +@Override +public void restartConnectorAndTasks( +RestartRequest request, +Callback callback +) { +final String connectorName = request.connectorName(); +addRequest( +() -> { +if (checkRebalanceNeeded(callback)) { +return null; +} +if (!configState.connectors().contains(request.connectorName())) { +callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); +return null; +} +if (isLeader()) { +// Write a restart request to the config backing store, to be executed asynchronously in tick() +configBackingStore.putRestartRequest(request); +// Compute and send the response that this was accepted +Optional maybePlan = buildRestartPlanFor(request); +if (!maybePlan.isPresent()) { +callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); +} else { +RestartPlan plan = maybePlan.get(); +callback.onCompletion(null, plan.restartConnectorStateInfo()); +} +} else { +callback.onCompletion(new NotLeaderException("Cannot process restart request since it is not assigned to this member", leaderUrl()), null); +} + +return null; +}, +forwardErrorCallback(callback) +); +} + +/** + * Process all pending restart requests. There can be at most one request per connector, because of how + * {@link RestartRequest#equals(Object)} and {@link RestartRequest#hashCode()} are based only on the connector name. + * + * This method is called from within the {@link #tick()} method. It is synchronized so that all pending restart requests + * are processed at once before any additional requests are added. + */ +private synchronized void processRestartRequests() { +RestartRequest request; +while ((request = pendingRestartRequests.pollFirst()) != null) { +doRestartConnectorAndTasks(request); +} +} + +protected synchronized boolean doRestartConnectorAndTasks(RestartRequest request) { +final String connectorName = request.connectorName(); +Optional maybePlan = buildRestartPlanFor(request); +if (!maybePlan.isPresent()) { +log.debug("Skipping restart of connector '{}' since no status is available: {}", connectorName, request); +return false; +} +RestartPlan plan = maybePlan.get(); +log.info("Executing {}", plan); + + +// If requested, stop the connector and any tasks, marking each as restarting +final ExtendedAssignment currentAssignments = assignment; +final Collection assignedIdsToRestart = plan.taskIdsToRestart() + .stream() + .filter(taskId -> currentAssignments.tasks().contains(taskId)) + .collect(Collectors.toList()); +final boolean restartConnector = plan.restartConnector() && currentAssignments.connectors().contains(connectorName); +final boolean restartTasks = !assignedIdsToRestart.isEmpty(); +if (restartConnector) { +worker.stopAndAwaitConnector(connectorName); +recordRestarting(connectorName); +} +if (restartTasks) { +// Stop the tasks and mark as restarting +worker.stopAndAwaitTasks(assignedIdsToRestart); +assignedIdsToRestart.forEach(this::recordRestarting); +} + +// Now restart the connector and tasks +if (restartConnector) { +startConnector(connectorName, (error, targetState) -> { +if (error == null) { +log.info("Connector {} successfully restarted", connectorName); Review comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r651291606 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java ## @@ -58,6 +58,9 @@ public void start(Map props) { commonConfigs = props; log.info("Started {} connector {}", this.getClass().getSimpleName(), connectorName); connectorHandle.recordConnectorStart(); +if ("true".equalsIgnoreCase(props.getOrDefault("connector.start.inject.error", "false"))) { Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs
kpatelatwork commented on pull request #10841: URL: https://github.com/apache/kafka/pull/10841#issuecomment-860983059 @rhauch I have resolved all the review comments. Could you please check to see if it looks good now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs
kpatelatwork commented on a change in pull request #10841: URL: https://github.com/apache/kafka/pull/10841#discussion_r651267237 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java ## @@ -56,6 +57,39 @@ "set X-Frame-Options:DENY, add :no-cache, no-store, must-revalidate " ); +@Test +public void testListenersConfigAllowedValues() { +Map props = baseProps(); + +// no value set for "listeners" +WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(LISTENERS_DEFAULT, config.getList(WorkerConfig.LISTENERS_CONFIG)); + +props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:";); +config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), Arrays.asList("http://a.b:";)); + +props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:, https://a.b:7812";); +config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), Arrays.asList("http://a.b:";, "https://a.b:7812";)); + +new WorkerConfig(WorkerConfig.baseConfigDef(), props); +} + +@Test +public void testListenersConfigNotAllowedValues() { +Map props = baseProps(); + +props.put(WorkerConfig.LISTENERS_CONFIG, ""); +assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props)).printStackTrace(); Review comment: Very good suggestion, I added that case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs
kpatelatwork commented on a change in pull request #10841: URL: https://github.com/apache/kafka/pull/10841#discussion_r651267091 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java ## @@ -56,6 +57,39 @@ "set X-Frame-Options:DENY, add :no-cache, no-store, must-revalidate " ); +@Test +public void testListenersConfigAllowedValues() { +Map props = baseProps(); + +// no value set for "listeners" +WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(LISTENERS_DEFAULT, config.getList(WorkerConfig.LISTENERS_CONFIG)); + +props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:";); +config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), Arrays.asList("http://a.b:";)); + +props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:, https://a.b:7812";); +config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), Arrays.asList("http://a.b:";, "https://a.b:7812";)); + +new WorkerConfig(WorkerConfig.baseConfigDef(), props); +} + +@Test +public void testListenersConfigNotAllowedValues() { +Map props = baseProps(); Review comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs
kpatelatwork commented on a change in pull request #10841: URL: https://github.com/apache/kafka/pull/10841#discussion_r651266585 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java ## @@ -56,6 +57,39 @@ "set X-Frame-Options:DENY, add :no-cache, no-store, must-revalidate " ); +@Test +public void testListenersConfigAllowedValues() { +Map props = baseProps(); + +// no value set for "listeners" +WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(LISTENERS_DEFAULT, config.getList(WorkerConfig.LISTENERS_CONFIG)); + +props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:";); +config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), Arrays.asList("http://a.b:";)); + +props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:, https://a.b:7812";); +config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), Arrays.asList("http://a.b:";, "https://a.b:7812";)); + +new WorkerConfig(WorkerConfig.baseConfigDef(), props); +} + +@Test +public void testListenersConfigNotAllowedValues() { +Map props = baseProps(); + +props.put(WorkerConfig.LISTENERS_CONFIG, ""); +assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props)).printStackTrace(); + +props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:,";); +assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props)); Review comment: Fixed, I added "listeners" and "admin.listeners" to each message, Could you please check to see if it looks good now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs
kpatelatwork commented on a change in pull request #10841: URL: https://github.com/apache/kafka/pull/10841#discussion_r651266416 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java ## @@ -56,6 +57,39 @@ "set X-Frame-Options:DENY, add :no-cache, no-store, must-revalidate " ); +@Test +public void testListenersConfigAllowedValues() { +Map props = baseProps(); + +// no value set for "listeners" +WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(LISTENERS_DEFAULT, config.getList(WorkerConfig.LISTENERS_CONFIG)); + +props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:";); +config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), Arrays.asList("http://a.b:";)); Review comment: copy paste mistake from other test, I fixed both tests. ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java ## @@ -56,6 +57,39 @@ "set X-Frame-Options:DENY, add :no-cache, no-store, must-revalidate " ); +@Test +public void testListenersConfigAllowedValues() { +Map props = baseProps(); + +// no value set for "listeners" +WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(LISTENERS_DEFAULT, config.getList(WorkerConfig.LISTENERS_CONFIG)); + +props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:";); +config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), Arrays.asList("http://a.b:";)); + +props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:, https://a.b:7812";); +config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), Arrays.asList("http://a.b:";, "https://a.b:7812";)); + +new WorkerConfig(WorkerConfig.baseConfigDef(), props); +} + +@Test +public void testListenersConfigNotAllowedValues() { +Map props = baseProps(); + +props.put(WorkerConfig.LISTENERS_CONFIG, ""); +assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props)).printStackTrace(); + +props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:,";); +assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props)); Review comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12946) __consumer_offsets topic with very big partitions
[ https://issues.apache.org/jira/browse/KAFKA-12946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363203#comment-17363203 ] Emi commented on KAFKA-12946: - [~rndgstn] What do you mean for "has a significantly smaller size than the leader"? Thanks > __consumer_offsets topic with very big partitions > - > > Key: KAFKA-12946 > URL: https://issues.apache.org/jira/browse/KAFKA-12946 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.0.0 >Reporter: Emi >Priority: Critical > > I am using Kafka 2.0.0 with java 8u191 > There is a partitions of the __consumer_offsets topic that is 600 GB with > 6000 segments older than 4 months. Other partitions of that topic are small: > 20-30 MB. > There are 60 consumer groups, 90 topics and 100 partitions per topic. > There aren't errors in the logs. From the log of the logcleaner, I can see > that partition is never touched from the logcleaner thread for the > compaction, but it only add new segments. > How is this possible? > There was another partition with the same problem, but after some months it > has been compacted. Now there is only one partition with this problem, but > this is bigger and keep growing > I have used the kafka-dump-log tool to check these old segments and I can see > many duplicates. So I would assume that is not compacted. > My settings: > {{offsets.commit.required.acks = -1}} > {{[offsets.commit.timeout.ms|http://offsets.commit.timeout.ms/]}} = 5000 > {{offsets.load.buffer.size = 5242880}} > > {{[offsets.retention.check.interval.ms|http://offsets.retention.check.interval.ms/]}} > = 60 > {{offsets.retention.minutes = 10080}} > {{offsets.topic.compression.codec = 0}} > {{offsets.topic.num.partitions = 50}} > {{offsets.topic.replication.factor = 3}} > {{offsets.topic.segment.bytes = 104857600}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram opened a new pull request #10882: KAFKA-12948: Remove node from ClusterConnectionStates.connectingNodes when node is removed
rajinisivaram opened a new pull request #10882: URL: https://github.com/apache/kafka/pull/10882 NetworkClient.poll() throws IllegalStateException when checking `isConnectionSetupTimeout` if all nodes in `ClusterConnectionStates.connectingNodes` aren't present in `ClusterConnectionStates.nodeState`. When we remove a node from `nodeState`, we should also remove from `connectingNodes`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs
kpatelatwork commented on a change in pull request #10841: URL: https://github.com/apache/kafka/pull/10841#discussion_r651266225 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ## @@ -497,6 +476,37 @@ static void validateHeaderConfigAction(String action) { + "Expected one of %s", action, HEADER_ACTIONS)); } } +private static class ListenersValidator implements ConfigDef.Validator { +@Override +public void ensureValid(String name, Object value) { +if (value == null) { +throw new ConfigException("Invalid value, at least one URI is expected, ex: http://localhost:8080,https://localhost:8443.";); +} + +if (!(value instanceof List)) { +throw new ConfigException("Invalid value type (list expected)."); +} + +List items = (List) value; +if (items.isEmpty()) { +throw new ConfigException("Invalid value, at least one URI is expected, ex: http://localhost:8080,https://localhost:8443.";); +} + +for (Object item: items) { +if (!(item instanceof String)) { +throw new ConfigException("Invalid type for listener (expected String)."); +} +if (Utils.isBlank((String) item)) { +throw new ConfigException("Empty listener found when parsing list."); Review comment: Fixed, it was a copy paste mistake from AdminListenerValidator, I have fixed AdminListenerValidator also, Would you please review that class changes also and let me know its ok to change the exception message there or is it backward incompatible? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] erdody commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
erdody commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r649771532 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ## @@ -357,6 +367,62 @@ public void validateConnectorConfig(Map connectorProps, Callback }); } +/** + * Build the {@link RestartPlan} that describes what should and should not be restarted given the restart request + * and the current status of the connector and task instances. + * + * @param request the restart request; may not be null + * @return the restart plan, or empty this worker has no status for the connector named in the request and therefore the Review comment: Nit: or empty **if** this worker ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java ## @@ -255,12 +257,29 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto @POST @Path("/{connector}/restart") -public void restartConnector(final @PathParam("connector") String connector, +public Response restartConnector(final @PathParam("connector") String connector, final @Context HttpHeaders headers, + final @DefaultValue ("false") @QueryParam("includeTasks") Boolean includeTasks, + final @DefaultValue ("false") @QueryParam("onlyFailed") Boolean onlyFailed, final @QueryParam("forward") Boolean forward) throws Throwable { -FutureCallback cb = new FutureCallback<>(); -herder.restartConnector(connector, cb); -completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", headers, null, forward); +RestartRequest restartRequest = new RestartRequest(connector, onlyFailed, includeTasks); +if (restartRequest.forciblyRestartConnectorOnly()) { +// For backward compatibility, just restart the connector instance and return OK with no body +FutureCallback cb = new FutureCallback<>(); +herder.restartConnector(connector, cb); +completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", headers, null, forward); +return Response.ok().build(); +} + +FutureCallback cb = new FutureCallback<>(); +herder.restartConnectorAndTasks(restartRequest, cb); +Map queryParameters = new HashMap<>(); +queryParameters.put("includeTasks", String.valueOf(includeTasks)); +queryParameters.put("onlyFailed", String.valueOf(onlyFailed)); +String forwardingPath = "/connectors/" + connector + "/restart"; Review comment: Nit: Move up so you can share with 270? ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -186,6 +192,10 @@ private short currentProtocolVersion; private short backoffRetries; +// visible for testing +// The pending restart requests for the connectors; +final NavigableSet pendingRestartRequests = new TreeSet<>(); Review comment: There are a few comments in different places explaining the special equality implementation in RestartRequest. Have we considered making this a Map to make it explicit that we keep the latest per connector, have a more typical equals/hashcode and avoid all the warnings? ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -186,6 +192,10 @@ private short currentProtocolVersion; private short backoffRetries; +// visible for testing +// The pending restart requests for the connectors; +final NavigableSet pendingRestartRequests = new TreeSet<>(); Review comment: Just out of curiosity, any particular reason why we want to process these in connectorName order? (instead of FIFO) ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -1063,6 +1076,104 @@ public int generation() { return generation; } +@Override +public void restartConnectorAndTasks( +RestartRequest request, +Callback callback +) { +final String connectorName = request.connectorName(); +addRequest( +() -> { +if (checkRebalanceNeeded(callback)) { +return null; +} +if (!configState.connectors().contains(request.connectorName())) { +callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); +return null; +} +if (isLeader()) { +
[GitHub] [kafka] jsancio commented on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
jsancio commented on pull request #10593: URL: https://github.com/apache/kafka/pull/10593#issuecomment-860980428 > Or may be we can make a seperate PR to handle the validation for onSnapshotFrozen? I am okay with fixing this in a future PR. Do you want to go ahead and file an sub-task for https://issues.apache.org/jira/browse/KAFKA-10310 and link it here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12948) NetworkClient.close(node) with node in connecting state makes NetworkClient unusable
Rajini Sivaram created KAFKA-12948: -- Summary: NetworkClient.close(node) with node in connecting state makes NetworkClient unusable Key: KAFKA-12948 URL: https://issues.apache.org/jira/browse/KAFKA-12948 Project: Kafka Issue Type: Bug Components: network Affects Versions: 2.7.1, 2.8.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.7.2, 2.8.1 `NetworkClient.close(node)` closes the node and removes it from `ClusterConnectionStates.nodeState`, but not from `ClusterConnectionStates.connectingNodes`. Subsequent `NetworkClient.poll()` invocations throw IllegalStateException and this leaves the NetworkClient in an unusable state until the node is removed from connectionNodes or added to nodeState. We don't use `NetworkClient.close(node)` in clients, but we use it in clients started by brokers for replica fetcher and controller. Since brokers use NetworkClientUtils.isReady() before establishing connections and this invokes poll(), the NetworkClient never recovers. Exception stack trace: {code:java} java.lang.IllegalStateException: No entry found for connection 0 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:409) at org.apache.kafka.clients.ClusterConnectionStates.isConnectionSetupTimeout(ClusterConnectionStates.java:446) at org.apache.kafka.clients.ClusterConnectionStates.lambda$nodesWithConnectionSetupTimeout$0(ClusterConnectionStates.java:458) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174) at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.kafka.clients.ClusterConnectionStates.nodesWithConnectionSetupTimeout(ClusterConnectionStates.java:459) at org.apache.kafka.clients.NetworkClient.handleTimedOutConnections(NetworkClient.java:807) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) at org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:42) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on a change in pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs
rhauch commented on a change in pull request #10841: URL: https://github.com/apache/kafka/pull/10841#discussion_r651200941 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ## @@ -497,6 +476,37 @@ static void validateHeaderConfigAction(String action) { + "Expected one of %s", action, HEADER_ACTIONS)); } } +private static class ListenersValidator implements ConfigDef.Validator { +@Override +public void ensureValid(String name, Object value) { +if (value == null) { +throw new ConfigException("Invalid value, at least one URI is expected, ex: http://localhost:8080,https://localhost:8443.";); +} + +if (!(value instanceof List)) { +throw new ConfigException("Invalid value type (list expected)."); +} + +List items = (List) value; +if (items.isEmpty()) { +throw new ConfigException("Invalid value, at least one URI is expected, ex: http://localhost:8080,https://localhost:8443.";); +} + +for (Object item: items) { +if (!(item instanceof String)) { +throw new ConfigException("Invalid type for listener (expected String)."); +} +if (Utils.isBlank((String) item)) { +throw new ConfigException("Empty listener found when parsing list."); Review comment: Nit: should we say "listener URL" instead of just "listener"? Or maybe "Empty URL found when parsing listeners list"? ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java ## @@ -56,6 +57,39 @@ "set X-Frame-Options:DENY, add :no-cache, no-store, must-revalidate " ); +@Test +public void testListenersConfigAllowedValues() { +Map props = baseProps(); + +// no value set for "listeners" +WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(LISTENERS_DEFAULT, config.getList(WorkerConfig.LISTENERS_CONFIG)); + +props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:";); +config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), Arrays.asList("http://a.b:";)); + +props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:, https://a.b:7812";); +config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), Arrays.asList("http://a.b:";, "https://a.b:7812";)); + +new WorkerConfig(WorkerConfig.baseConfigDef(), props); +} + +@Test +public void testListenersConfigNotAllowedValues() { +Map props = baseProps(); + +props.put(WorkerConfig.LISTENERS_CONFIG, ""); +assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props)).printStackTrace(); + +props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:,";); +assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props)); Review comment: Should you check that the exception message contains an expected string? Technically, catching this exception doesn't ensure that the `listeners` property is the one that failed. ## File path: docs/upgrade.html ## @@ -69,7 +69,9 @@ Notable changes in 3 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor instead. The quota.producer.default and quota.consumer.default configurations were removed (https://issues.apache.org/jira/browse/KAFKA-12591";>KAFKA-12591). Dynamic quota defaults must be used instead. -The default value for the consumer configuration session.timeout.ms was increased from 10s to 45s. See +The deprecated worker configurations rest.host.name and rest.port were removed (https://issues.apache.org/jira/browse/KAFKA-12482";>KAFKA-12482) in the Kafka Connect. Review comment: Also, should probably add `connect` to the list on line 32. ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java ## @@ -56,6 +57,39 @@ "set X-Frame-Options:DENY, add :no-cache, no-store, must-revalidate " ); +@Test +public void testListenersConfigAllowedValues() { +Map props = baseProps(); + +// no value set for "listeners" +WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(LISTENERS_DEFAULT, config.getList(WorkerConfig.LISTENERS_CONFIG)); + +props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:";); +config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); +assertEquals(config.getList(WorkerConfi
[jira] [Commented] (KAFKA-12946) __consumer_offsets topic with very big partitions
[ https://issues.apache.org/jira/browse/KAFKA-12946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363155#comment-17363155 ] Ron Dagostino commented on KAFKA-12946: --- If the partition isn't being cleaned then you can try setting min.cleanable.dirty.ratio=0 for the __consumer_offsets topic; this might allow it to get cleaned. You can delete that config after a while to let the value default back. Another possibility might exist if one of the follower replicas has a significantly smaller size than the leader; in such cases you can move leadership to the smaller replica and then reassign the follower replicas to new brokers so that they will copy the (much smaller-sized) data; then you can migrate the followers back to where they were originally and move the leader back to the original leader. This solution will only work if you have more brokers than the replication factor. Finally, take a look at https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions. You may not have any other options right now if it is a hanging transaction, but help is coming. > __consumer_offsets topic with very big partitions > - > > Key: KAFKA-12946 > URL: https://issues.apache.org/jira/browse/KAFKA-12946 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.0.0 >Reporter: Emi >Priority: Critical > > I am using Kafka 2.0.0 with java 8u191 > There is a partitions of the __consumer_offsets topic that is 600 GB with > 6000 segments older than 4 months. Other partitions of that topic are small: > 20-30 MB. > There are 60 consumer groups, 90 topics and 100 partitions per topic. > There aren't errors in the logs. From the log of the logcleaner, I can see > that partition is never touched from the logcleaner thread for the > compaction, but it only add new segments. > How is this possible? > There was another partition with the same problem, but after some months it > has been compacted. Now there is only one partition with this problem, but > this is bigger and keep growing > I have used the kafka-dump-log tool to check these old segments and I can see > many duplicates. So I would assume that is not compacted. > My settings: > {{offsets.commit.required.acks = -1}} > {{[offsets.commit.timeout.ms|http://offsets.commit.timeout.ms/]}} = 5000 > {{offsets.load.buffer.size = 5242880}} > > {{[offsets.retention.check.interval.ms|http://offsets.retention.check.interval.ms/]}} > = 60 > {{offsets.retention.minutes = 10080}} > {{offsets.topic.compression.codec = 0}} > {{offsets.topic.num.partitions = 50}} > {{offsets.topic.replication.factor = 3}} > {{offsets.topic.segment.bytes = 104857600}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on pull request #10863: KAFKA-12890; Consumer group stuck in `CompletingRebalance`
dajac commented on pull request #10863: URL: https://github.com/apache/kafka/pull/10863#issuecomment-860921205 @hachikuji Thanks for the clarification. Yes, that makes sense to me as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik edited a comment on pull request #10280: KAFKA-12554: Refactor Log layer
kowshik edited a comment on pull request #10280: URL: https://github.com/apache/kafka/pull/10280#issuecomment-860537313 @junrao Thanks for the review! I ran the system tests. 1. [System test run #4560](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4560/) on top of the latest commit 008b701386ce5a4d892d6ac5b90798b981c4fba0 from this PR. The run finished with 12 test failures. 2. [System test run #4561](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4561/) against AK trunk on top of commit 6de37e536ac76ef13530d49dc7320110332cd1ee which does not contain changes from this PR. The run finished with 13 test failures. There were 11 overlapping failures in both (1) and (2). For these, I didn't find anything abnormal in the logs so far, the failure reason seems similar in both. The only new failure in (1) that's not present in (2) was: ``` Module: kafkatest.tests.client.consumer_test Class: OffsetValidationTest Method: test_broker_failure Arguments: { "clean_shutdown": true, "enable_autocommit": false, "metadata_quorum": "REMOTE_KRAFT" } ``` Logs indicate that the test failed [at this line](https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/tests/kafkatest/tests/client/consumer_test.py#L388) because one of the worker nodes running the consumer didn't complete within the timeout of 30s. This doesn't seem indicative of a real failure (yet), so I'm rerunning the system tests again in [test run #4562](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4562/) to check if the failure is consistent. I'll keep you posted on the outcome of this second run. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #10863: KAFKA-12890; Consumer group stuck in `CompletingRebalance`
hachikuji commented on pull request #10863: URL: https://github.com/apache/kafka/pull/10863#issuecomment-860914096 @dajac Given the background from KAFKA-10134, the rebalance timeout seems like the right one in my mind. We're basically allowing for a delay between the JoinGroup and SyncGroup caused by consumer processing while the rebalance is in progress. This time is bounded by max.poll.interval.ms which is used as the rebalance timeout internally. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #10843: MINOR: Log formatting for exceptions during configuration related operations
dajac commented on pull request #10843: URL: https://github.com/apache/kafka/pull/10843#issuecomment-860910847 @YiDing-Duke Thanks for the update. @showuon Does the PR look good to you now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
jsancio commented on pull request #10593: URL: https://github.com/apache/kafka/pull/10593#issuecomment-860908546 > I assume a KIP is needed. We don't need a KIP. All of these API are internal APIs that are not accessible/publish to projects external to Apache Kafka. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10804: KAFKA-12877: Make flexibleVersions mandatory
cmccabe commented on pull request #10804: URL: https://github.com/apache/kafka/pull/10804#issuecomment-860882713 OK, I reconsidered this a bit. I think we DON'T want to make incompatible changes if we can avoid it... such as changing the default from NONE to ALL. There are a few reasons for this. One is that we eventually want to have a verifier tool that checks that changes between message files shipped with version X and those shipped with version X+1 are compatible. This is a lot easier if we refrain from gratuitous compatibility breaks. Another is that there are external projects that are looking at these JSON files now. There's no formal contract or anything, but it's just nicer for the ecosystem not to break things. Finally, downstream forks of Kafka may have a difficult time if we change defaults around like this. For example, if someone had an in-house fork of Kafka with some new messages, changing around defaults could cause a compatibility break for them. It's better not to have to worry about this. For all these reasons, I think it's best just to require explicitly spelling out `flexibleVersions` for now. It's a little extra boilerplate but not that bad in the grand scheme of things. Maybe eventually we can have more versioning in the JSON file that lets us elide some of this, but for now I think this is the best way to go. cc @hachikuji -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #10880: KAFKA-12870; Flush in progress not cleared after transaction completion
hachikuji commented on pull request #10880: URL: https://github.com/apache/kafka/pull/10880#issuecomment-860877998 Note I have a few additional tests which I will post shortly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wycccccc opened a new pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for StreamsMe…
wycc opened a new pull request #10881: URL: https://github.com/apache/kafka/pull/10881 Development of EasyMock and PowerMock has stagnated while Mockito continues to be actively developed. With the new Java cadence, it's a problem to depend on libraries that do bytecode generation and are not actively maintained. In addition, Mockito is also easier to use.[KAFKA-7438](https://issues.apache.org/jira/browse/KAFKA-7438) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12944) KIP-724: Always write record batches with message format v2
[ https://issues.apache.org/jira/browse/KAFKA-12944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-12944: Description: If IBP is 3.0 or higher. > KIP-724: Always write record batches with message format v2 > --- > > Key: KAFKA-12944 > URL: https://issues.apache.org/jira/browse/KAFKA-12944 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 3.0.0 > > > If IBP is 3.0 or higher. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12947) Replace EasyMock and PowerMock with Mockito for StreamsMetricsImplTest ...
YI-CHEN WANG created KAFKA-12947: Summary: Replace EasyMock and PowerMock with Mockito for StreamsMetricsImplTest ... Key: KAFKA-12947 URL: https://issues.apache.org/jira/browse/KAFKA-12947 Project: Kafka Issue Type: Sub-task Reporter: YI-CHEN WANG Assignee: YI-CHEN WANG For Kafka-7438 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji opened a new pull request #10880: KAFKA-12870; Flush in progress not cleared after transaction completion
hachikuji opened a new pull request #10880: URL: https://github.com/apache/kafka/pull/10880 We had been using `RecordAccumulator.beginFlush` in order to force the `RecordAccumulator` to flush pending batches when a transaction was being completed. Internally, `RecordAccumulator` has a simple counter for the number of flushes in progress. The count gets incremented in `beginFlush` and it is expected to be decremented by `awaitFlushCompletion`. The second call to decrement the counter never happened in the transactional path, so the counter could get stuck at a positive value, which means that the linger time would effectively be ignored. The patch here fixes the problem by removing the use of `beginFlush` in `Sender`. Instead, we now add an additional condition in `RecordAccumulator` to explicitly check when a transaction is being completed. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #10878: KAFKA-12898; Owned partitions in the subscription must be sorted
dajac commented on pull request #10878: URL: https://github.com/apache/kafka/pull/10878#issuecomment-860843508 I forgot to update few tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org