[GitHub] [kafka] showuon commented on pull request #12908: MINOR: Prevent NPE in SmokeTestDriver (fix flaky test)
showuon commented on PR #12908: URL: https://github.com/apache/kafka/pull/12908#issuecomment-1336108293 Failed with some `MemoryRecordsBuilderTest` tests. Re-run jenkins: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12908/4/ ``` Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.record.MemoryRecordsBuilderTest.[4] magic=0, bufferOffset=0, compressionType=gzip Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.record.MemoryRecordsBuilderTest.[5] magic=1, bufferOffset=0, compressionType=gzip Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.record.MemoryRecordsBuilderTest.[10] magic=0, bufferOffset=0, compressionType=lz4 Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.record.MemoryRecordsBuilderTest.[15] magic=1, bufferOffset=15, compressionType=none Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.record.MemoryRecordsBuilderTest.[19] magic=2, bufferOffset=15, compressionType=gzip Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.record.MemoryRecordsBuilderTest.[20] magic=0, bufferOffset=15, compressionType=snappy Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.record.MemoryRecordsBuilderTest.[22] magic=2, bufferOffset=15, compressionType=snappy ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12940: MINOR: Remove lock contention while adding sensors
showuon commented on PR #12940: URL: https://github.com/apache/kafka/pull/12940#issuecomment-1336107162 Re-run the jenkins: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12940/4/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request, #12946: KAFKA-14427 ZK client support for migrations
mumrah opened a new pull request, #12946: URL: https://github.com/apache/kafka/pull/12946 TODO -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #12928: KAFKA-14304 Add RPC changes, records, and config from KIP-866
mumrah merged PR #12928: URL: https://github.com/apache/kafka/pull/12928 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue opened a new pull request, #12945: KAFKA-14365: Refactor Fetcher to allow different implementations
kirktrue opened a new pull request, #12945: URL: https://github.com/apache/kafka/pull/12945 This change refactors the existing `Fetcher` by splitting out the parts related to offset listing into a dedicated class named `OffsetsFinder`. This allows the `Fetcher` implementation to be more dedicated and thus easier to refactor (in coming commits). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14439) Specify returned errors for various APIs and versions
[ https://issues.apache.org/jira/browse/KAFKA-14439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642726#comment-17642726 ] Jason Gustafson edited comment on KAFKA-14439 at 12/2/22 11:52 PM: --- Yeah, we've had so many compatibility breaks due to error code usage. Putting the errors into the spec would also enable better enforcement. One option could be something like this: {code:java} { "name": "ErrorCode", "type": "enum16", "versions": "0+", "about": "The error code,", "values": [ {"value": 0, "versions": "0+", "about": "The operation completed successfully"}, {"value": 1, "versions": "1+", "about": "The requested offset was out of range"}, {"value": 4, "versions": "1+", "about": "Invalid fetch size"}, ] } {code} Here "enum16" indicates a 2-byte enumeration where the values are provided in the `values` field. New values can only be added with a version bump. was (Author: hachikuji): Yeah, we've had so many compatibility breaks due to error code usage. Putting the errors into the spec would also enable better enforcement. One option could be something like this: {code:java} { "name": "ErrorCode", "type": "enum16", "versions": "0+", "about": "The error code,", "values": [ {"value": 0, "versions": "0+", "about": "The operation completed successfully"}, {"value": 1, "versions": "1+", "about": "The requested offset was out of range"}, ] } {code} Here "enum16" indicates a 2-byte enumeration where the values are provided in the `values` field. New values can only be added with a version bump. > Specify returned errors for various APIs and versions > - > > Key: KAFKA-14439 > URL: https://issues.apache.org/jira/browse/KAFKA-14439 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Priority: Major > > Kafka is known for supporting various clients and being compatible across > different versions. But one thing that is a bit unclear is what errors each > response can send. > Knowing what errors can come from each version helps those who implement > clients have a more defined spec for what errors they need to handle. When > new errors are added, it is clearer to the clients that changes need to be > made. > It also helps contributors get a better understanding about how clients are > expected to react and potentially find and prevent gaps like the one found in > https://issues.apache.org/jira/browse/KAFKA-14417 > I briefly synced offline with [~hachikuji] about this and he suggested maybe > adding values for the error codes in the schema definitions of APIs that > specify the error codes and what versions they are returned on. One idea was > creating some enum type to accomplish this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14439) Specify returned errors for various APIs and versions
[ https://issues.apache.org/jira/browse/KAFKA-14439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642735#comment-17642735 ] Ismael Juma commented on KAFKA-14439: - I think it would also be useful for the server to indicate whether an error is retriable or not. That would make protocol evolution a lot more flexible. > Specify returned errors for various APIs and versions > - > > Key: KAFKA-14439 > URL: https://issues.apache.org/jira/browse/KAFKA-14439 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Priority: Major > > Kafka is known for supporting various clients and being compatible across > different versions. But one thing that is a bit unclear is what errors each > response can send. > Knowing what errors can come from each version helps those who implement > clients have a more defined spec for what errors they need to handle. When > new errors are added, it is clearer to the clients that changes need to be > made. > It also helps contributors get a better understanding about how clients are > expected to react and potentially find and prevent gaps like the one found in > https://issues.apache.org/jira/browse/KAFKA-14417 > I briefly synced offline with [~hachikuji] about this and he suggested maybe > adding values for the error codes in the schema definitions of APIs that > specify the error codes and what versions they are returned on. One idea was > creating some enum type to accomplish this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14438) Stop supporting empty consumer groupId
[ https://issues.apache.org/jira/browse/KAFKA-14438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14438: Priority: Blocker (was: Major) > Stop supporting empty consumer groupId > -- > > Key: KAFKA-14438 > URL: https://issues.apache.org/jira/browse/KAFKA-14438 > Project: Kafka > Issue Type: Task > Components: consumer >Reporter: Philip Nee >Priority: Blocker > Fix For: 4.0.0 > > > Currently, a warning message is logged upon using an empty consumer groupId. > In the next major release, we should drop the support of empty ("") consumer > groupId. > > cc [~hachikuji] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14439) Specify returned errors for various APIs and versions
[ https://issues.apache.org/jira/browse/KAFKA-14439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642726#comment-17642726 ] Jason Gustafson edited comment on KAFKA-14439 at 12/2/22 11:44 PM: --- Yeah, we've had so many compatibility breaks due to error code usage. Putting the errors into the spec would also enable better enforcement. One option could be something like this: {code:java} { "name": "ErrorCode", "type": "enum16", "versions": "0+", "about": "The error code,", "values": [ {"value": 0, "versions": "0+", "about": "The operation completed successfully"}, {"value": 1, "versions": "1+", "about": "The requested offset was out of range"}, ] } {code} Here "enum16" indicates a 2-byte enumeration where the values are provided in the `values` field. New values can only be added with a version bump. was (Author: hachikuji): Yeah, we've had so many compatibility breaks due to error code usage. Putting the errors into the spec would also enable better enforcement. One option could be something like this: {code:java} { "name": "ErrorCode", "type": "enum16", "versions": "0+", "about": "The error code,", "values": [ {"value": 0, "versions": "0+", "about": "The operation completed successfully"}, {"value": 1, "versions": "1+", about": "The requested offset was out of range"}, ] } {code} Here "enum16" indicates a 2-byte enumeration where the values are provided in the `values` field. New values can only be added with a version bump. > Specify returned errors for various APIs and versions > - > > Key: KAFKA-14439 > URL: https://issues.apache.org/jira/browse/KAFKA-14439 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Priority: Major > > Kafka is known for supporting various clients and being compatible across > different versions. But one thing that is a bit unclear is what errors each > response can send. > Knowing what errors can come from each version helps those who implement > clients have a more defined spec for what errors they need to handle. When > new errors are added, it is clearer to the clients that changes need to be > made. > It also helps contributors get a better understanding about how clients are > expected to react and potentially find and prevent gaps like the one found in > https://issues.apache.org/jira/browse/KAFKA-14417 > I briefly synced offline with [~hachikuji] about this and he suggested maybe > adding values for the error codes in the schema definitions of APIs that > specify the error codes and what versions they are returned on. One idea was > creating some enum type to accomplish this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14439) Specify returned errors for various APIs and versions
[ https://issues.apache.org/jira/browse/KAFKA-14439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642726#comment-17642726 ] Jason Gustafson commented on KAFKA-14439: - Yeah, we've had so many compatibility breaks due to error code usage. Putting the errors into the spec would also enable better enforcement. One option could be something like this: {code:java} { "name": "ErrorCode", "type": "enum16", "versions": "0+", "about": "The error code,", "values": [ {"value": 0, "versions": "0+", "about": "The operation completed successfully"}, {"value": 1, "versions": "1+", about": "The requested offset was out of range"}, ] } {code} Here "enum16" indicates a 2-byte enumeration where the values are provided in the `values` field. New values can only be added with a version bump. > Specify returned errors for various APIs and versions > - > > Key: KAFKA-14439 > URL: https://issues.apache.org/jira/browse/KAFKA-14439 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Priority: Major > > Kafka is known for supporting various clients and being compatible across > different versions. But one thing that is a bit unclear is what errors each > response can send. > Knowing what errors can come from each version helps those who implement > clients have a more defined spec for what errors they need to handle. When > new errors are added, it is clearer to the clients that changes need to be > made. > It also helps contributors get a better understanding about how clients are > expected to react and potentially find and prevent gaps like the one found in > https://issues.apache.org/jira/browse/KAFKA-14417 > I briefly synced offline with [~hachikuji] about this and he suggested maybe > adding values for the error codes in the schema definitions of APIs that > specify the error codes and what versions they are returned on. One idea was > creating some enum type to accomplish this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14439) Specify returned errors for various APIs and versions
Justine Olshan created KAFKA-14439: -- Summary: Specify returned errors for various APIs and versions Key: KAFKA-14439 URL: https://issues.apache.org/jira/browse/KAFKA-14439 Project: Kafka Issue Type: Task Reporter: Justine Olshan Kafka is known for supporting various clients and being compatible across different versions. But one thing that is a bit unclear is what errors each response can send. Knowing what errors can come from each version helps those who implement clients have a more defined spec for what errors they need to handle. When new errors are added, it is clearer to the clients that changes need to be made. It also helps contributors get a better understanding about how clients are expected to react and potentially find and prevent gaps like the one found in https://issues.apache.org/jira/browse/KAFKA-14417 I briefly synced offline with [~hachikuji] about this and he suggested maybe adding values for the error codes in the schema definitions of APIs that specify the error codes and what versions they are returned on. One idea was creating some enum type to accomplish this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments
[ https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant reassigned KAFKA-14437: Assignee: Andrew Grant > Enhance StripedReplicaPlacer to account for existing partition assignments > -- > > Key: KAFKA-14437 > URL: https://issues.apache.org/jira/browse/KAFKA-14437 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Major > > Currently, in StripedReplicaPlacer we don’t take existing partition > assignments into consideration when the place method is called. This means > for new partitions added, they may get the same assignments as existing > partitions. This differs from AdminUtils, which has some logic to try and > shift where in the list of brokers we start making assignments from for new > partitions added. > For example, lets say we had the following > {code:java} > Rack 1: 0, 1, 2, 3 > Rack 2: 4, 5, 6, 7 > Rack 3: 8, 9, 10, 11 > {code} > CreateTopics might return the following assignment for two partitions: > {code:java} > P0: 6, 8, 2 > P1: 9, 3, 7 > {code} > If the user then calls CreatePartitions increasing the partition count to 4, > StripedReplicaPlacer does not take into account P0 and P1. It creates a > random rack offset and a random broker offset. So it could easily create the > same assignment for P3 and P4 that it created for P0 and P1. This is easily > reproduced in a unit test. > My suggestion is to enhance StripedReplicaPlacer to account for existing > partition assignments. Intuitively, we’d like to make assignments for added > partitions from “where we left off” when we were making the previous > assignments. In practice, its not possible to know exactly what the state was > during the previous partition assignments because, for example, brokers > fencing state may have changed. But I do think we can make a best effort > attempt to do so that is optimized for the common case where most brokers are > unfenced. Note, all the changes suggested below only will affect > StripedReplicaPlacer when place is called and there are existing partition > assignments, which happens when its servicing CreatePartitions requests. If > there are no existing partition assignments, which happens during > CreateTopics, the logic is unchanged. > First, we need to update ClusterDescriber to: > {code:java} > public interface ClusterDescriber { > /** > * Get an iterator through the usable brokers. > */ > Iterator usableBrokers(); > List> replicasForTopicName(String topicName); > } > {code} > The replicasForTopicName returns the existing partition assignments. This > will enable StripedReplicaPlacer to know about existing partition assignments > when they exist. > When place is called, some initialization is done in both RackList and > BrokerList. One thing that is initialized is the offset variable - this is a > variable used in both RackList and BrokerList that determines where in the > list of either racks or brokers respectively we should start from when making > the next assignment. Currently, it is initialized to a random value, based > off the size of the list. > I suggest we add some logic during initialization that sets the offset for > both RackList and BrokerList to a value based off the previous assignments. > Consider again the following rack metadata and existing assignments: > {code:java} > Rack 1: 0, 1, 2, 3 > Rack 2: 4, 5, 6, 7 > Rack 3: 8, 9, 10, 11 > > P0: 6, 8, 2 > P1: 9, 3, 7 > {code} > Lets imagine a user wants to create a new partition, P3. > First, we need to determine which rack to start from for P3: this corresponds > to the initial offset in RackList. We can look at the leader of P1 (not P0 > because P1 is the “last” partition we made an assignment for) and see its on > rack 3. So, the next rack we should start from should be rack 1. This means > we set offset in RackList to 0, instead of a random value, during > initialization. > Second, we need to determine which broker to start from {_}per rack{_}: this > corresponds to the initial offset in BrokerList. We can look at all the > existing partition assignments, P0 and P1 in our example, and _per rack_ > infer the last offset started from during previous assignments. For each > rack, we do this by iterating through each partition, in reverse order > because we care about the most recent starting position, and try to find the > first broker in the assignment from the rack. This enables us to know where > we last started from when making an assignment for that rack, which can be > used to determine where to continue on from within that rack. > So in our example, for rack 1 we can see the last broker we started from was > broker 3 in
[jira] [Created] (KAFKA-14438) Stop supporting empty consumer groupId
Philip Nee created KAFKA-14438: -- Summary: Stop supporting empty consumer groupId Key: KAFKA-14438 URL: https://issues.apache.org/jira/browse/KAFKA-14438 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee Fix For: 4.0.0 Currently, a warning message is logged upon using an empty consumer groupId. In the next major release, we should drop the support of empty ("") consumer groupId. cc [~hachikuji] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lihaosky opened a new pull request, #12944: [KAFKA-14395] add config to configure client supplier
lihaosky opened a new pull request, #12944: URL: https://github.com/apache/kafka/pull/12944 ## Description This PR is for https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams. It adds a new `default.client.supplier` config to configure the `KafkaClientSupplier`. ## Test Unit test ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12897: KAFKA-14379: consumer should refresh preferred read replica on update metadata
hachikuji commented on code in PR #12897: URL: https://github.com/apache/kafka/pull/12897#discussion_r1038530374 ## clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java: ## @@ -208,10 +209,10 @@ public static MetadataResponse metadataUpdateWith(final String clusterId, for (int i = 0; i < numPartitions; i++) { TopicPartition tp = new TopicPartition(topic, i); Node leader = nodes.get(i % nodes.size()); -List replicaIds = Collections.singletonList(leader.id()); +List replicaIds = nodes.stream().map(Node::id).collect(Collectors.toList()); Review Comment: Leaving the replication factor as implicit seems less than ideal. Perhaps we could make it an explicit argument? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-6643) Warm up new replicas from scratch when changelog topic has LIMITED retention time
[ https://issues.apache.org/jira/browse/KAFKA-6643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6643. Resolution: Won't Fix > Warm up new replicas from scratch when changelog topic has LIMITED retention > time > - > > Key: KAFKA-6643 > URL: https://issues.apache.org/jira/browse/KAFKA-6643 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Navinder Brar >Priority: Major > > In the current scenario, Kafka Streams has changelog Kafka topics(internal > topics having all the data for the store) which are used to build the state > of replicas. So, if we keep the number of standby replicas as 1, we still > have more availability for persistent state stores as changelog Kafka topics > are also replicated depending upon broker replication policy but that also > means we are using at least 4 times the space(1 master store, 1 replica > store, 1 changelog, 1 changelog replica). > Now if we have an year's data in persistent stores(rocksdb), we don't want > the changelog topics to have an year's data as it will put an unnecessary > burden on brokers(in terms of space). If we have to scale our kafka streams > application(having 200-300 TB's of data) we have to scale the kafka brokers > as well. We want to reduce this dependency and find out ways to just use > changelog topic as a queue, having just 2 or 3 days of data and warm up the > replicas from scratch in some other way. > I have few proposals in that respect. > 1. Use a new kafka topic related to each partition which we need to warm up > on the fly(when node containing that partition crashes. Produce into this > topic from another replica/active and built new replica through this topic. > 2. Use peer to peer file transfer(such as SFTP) as rocksdb can create > backups, which can be transferred from source node to destination node when a > new replica has to be built from scratch. > 3. Use HDFS in intermediate instead of kafka topic where we can keep > scheduled backups for each partition and use those to build new replicas. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14379) consumer should refresh preferred read replica on update metadata
[ https://issues.apache.org/jira/browse/KAFKA-14379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14379: Priority: Critical (was: Major) > consumer should refresh preferred read replica on update metadata > - > > Key: KAFKA-14379 > URL: https://issues.apache.org/jira/browse/KAFKA-14379 > Project: Kafka > Issue Type: Bug >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Critical > Fix For: 3.4.0 > > > The consumer (fetcher) refreshes the preferred read replica only on three > conditions: > # the consumer receives an OFFSET_OUT_OF_RANGE error > # the follower does not exist in the client's metadata (i.e., offline) > # after metadata.max.age.ms (5 min default) > For other errors, it will continue to reach to the possibly unavailable > follower and only after 5 minutes will it refresh the preferred read replica > and go back to the leader. > A specific example is when a partition is reassigned. the consumer will get > NOT_LEADER_OR_FOLLOWER which triggers a metadata update but the preferred > read replica will not be refreshed as the follower is still online. it will > continue to reach out to the old follower until the preferred read replica > expires. > the consumer can instead refresh its preferred read replica whenever it makes > a metadata update request. so when the consumer receives i.e. > NOT_LEADER_OR_FOLLOWER it can find the new preferred read replica without > waiting for the expiration. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14379) consumer should refresh preferred read replica on update metadata
[ https://issues.apache.org/jira/browse/KAFKA-14379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14379: Fix Version/s: 3.4.0 > consumer should refresh preferred read replica on update metadata > - > > Key: KAFKA-14379 > URL: https://issues.apache.org/jira/browse/KAFKA-14379 > Project: Kafka > Issue Type: Bug >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Major > Fix For: 3.4.0 > > > The consumer (fetcher) refreshes the preferred read replica only on three > conditions: > # the consumer receives an OFFSET_OUT_OF_RANGE error > # the follower does not exist in the client's metadata (i.e., offline) > # after metadata.max.age.ms (5 min default) > For other errors, it will continue to reach to the possibly unavailable > follower and only after 5 minutes will it refresh the preferred read replica > and go back to the leader. > A specific example is when a partition is reassigned. the consumer will get > NOT_LEADER_OR_FOLLOWER which triggers a metadata update but the preferred > read replica will not be refreshed as the follower is still online. it will > continue to reach out to the old follower until the preferred read replica > expires. > the consumer can instead refresh its preferred read replica whenever it makes > a metadata update request. so when the consumer receives i.e. > NOT_LEADER_OR_FOLLOWER it can find the new preferred read replica without > waiting for the expiration. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-6542) Tables should trigger joins too, not just streams
[ https://issues.apache.org/jira/browse/KAFKA-6542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6542. Resolution: Invalid This will be fixed via versioned stores and "delayed table lookups". > Tables should trigger joins too, not just streams > - > > Key: KAFKA-6542 > URL: https://issues.apache.org/jira/browse/KAFKA-6542 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Antony Stubbs >Priority: Major > > At the moment it's quite possible to have a race condition when joining a > stream with a table, if the stream event arrives first, before the table > event, in which case the join will fail. > This is also related to bootstrapping KTables (which is what a GKTable does). > Related to: KAFKA-4113 Allow KTable bootstrap -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-6509) Add additional tests for validating store restoration completes before Topology is intitalized
[ https://issues.apache.org/jira/browse/KAFKA-6509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6509. Resolution: Not A Problem > Add additional tests for validating store restoration completes before > Topology is intitalized > -- > > Key: KAFKA-6509 > URL: https://issues.apache.org/jira/browse/KAFKA-6509 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bill Bejeck >Priority: Major > Labels: newbie > > Right now -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-5245) KStream builder should capture serdes
[ https://issues.apache.org/jira/browse/KAFKA-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-5245. Resolution: Fixed > KStream builder should capture serdes > -- > > Key: KAFKA-5245 > URL: https://issues.apache.org/jira/browse/KAFKA-5245 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0, 0.10.2.1 >Reporter: Yeva Byzek >Priority: Minor > Labels: needs-kip > > Even if one specifies a serdes in `builder.stream`, later a call to > `groupByKey` may require the serdes again if it differs from the configured > streams app serdes. The preferred behavior is that if no serdes is provided > to `groupByKey`, it should use whatever was provided in `builder.stream` and > not what was in the app. > From the current docs: > “When to set explicit serdes: Variants of groupByKey exist to override the > configured default serdes of your application, which you must do if the key > and/or value types of the resulting KGroupedStream do not match the > configured default serdes.” -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-5085) Add test for rebalance exceptions
[ https://issues.apache.org/jira/browse/KAFKA-5085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-5085. Resolution: Abandoned Might no be necessary any longer given all the changes we did over the years. > Add test for rebalance exceptions > - > > Key: KAFKA-5085 > URL: https://issues.apache.org/jira/browse/KAFKA-5085 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Matthias J. Sax >Priority: Major > > We currently lack a proper test for the case that an exceptions in throw > during rebalance within Streams rebalance listener. > We recently had a bug, for which the app hang on an exception because the > exception was not handled properly (KAFKA-5073). Writing a test might require > some code refactoring to make testing simpler in the first place. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-5493) Optimize calls to flush for tasks and standby tasks
[ https://issues.apache.org/jira/browse/KAFKA-5493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5493: --- Labels: eos (was: ) > Optimize calls to flush for tasks and standby tasks > --- > > Key: KAFKA-5493 > URL: https://issues.apache.org/jira/browse/KAFKA-5493 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Priority: Major > Labels: eos > > With EOS enabled we don't checkpoint on {{commit}} so there is no need to > call {{flush}} when committing _top level tasks_ . However for _standby > tasks_ we still checkpoint thus need to still flush when committing. We need > to develop an approach where we can optimize for top level tasks by avoid > flushing on commit, while still preserving flush on commit for standby tasks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1038429334 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class CoordinatorRequestManager implements RequestManager { + +private final Logger log; +private final Time time; +private final long requestTimeoutMs; +private final ErrorEventHandler errorHandler; +private final long rebalanceTimeoutMs; +private final Optional groupId; Review Comment: GroupId must not be null for `FindCoordinator` request. If groupId is null, we don't need `CoordinatorRequestManager`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1038427640 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class CoordinatorRequestManager implements RequestManager { + +private final Logger log; +private final Time time; +private final long requestTimeoutMs; +private final ErrorEventHandler errorHandler; +private final long rebalanceTimeoutMs; +private final Optional groupId; + +private final CoordinatorRequestState coordinatorRequestState; +private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while +private Node coordinator; + + +public CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final Optional groupId, + final long rebalanceTimeoutMs) { +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); +this.coordinatorRequestState = new CoordinatorRequestState(config); +} + +CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ErrorEventHandler errorHandler, + final Optional groupId, + final long rebalanceTimeoutMs, + final long requestTimeoutMs, + final CoordinatorRequestState coordinatorRequestState) { +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.requestTimeoutMs = requestTimeoutMs; +this.coordinatorRequestState = coordinatorRequestState; +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +if (coordinatorRequestState.canSendRequest(currentTimeMs)) { +NetworkClientDelegate.UnsentRequest request = makeFindCoordinatorRequest(currentTimeMs); +return new NetworkClientDelegate.PollResult(-1, Collections.singletonList(request)); +} + +return new NetworkClientDelegate.PollResult( +coordinatorRequestState.remainingBackoffMs(currentTimeMs), +new ArrayList<>()); +} + +private NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long currentTimeMs) { +coordinatorRequestState.updateLastSend(currentTimeMs); +FindCoordinatorRequestData data = new FindCoordinatorRequestData() +.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id()) +
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1038427945 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class CoordinatorRequestManager implements RequestManager { + +private final Logger log; +private final Time time; +private final long requestTimeoutMs; +private final ErrorEventHandler errorHandler; +private final long rebalanceTimeoutMs; +private final Optional groupId; + +private final CoordinatorRequestState coordinatorRequestState; +private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while +private Node coordinator; + + +public CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final Optional groupId, + final long rebalanceTimeoutMs) { +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); +this.coordinatorRequestState = new CoordinatorRequestState(config); +} + +CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ErrorEventHandler errorHandler, + final Optional groupId, + final long rebalanceTimeoutMs, + final long requestTimeoutMs, + final CoordinatorRequestState coordinatorRequestState) { +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.requestTimeoutMs = requestTimeoutMs; +this.coordinatorRequestState = coordinatorRequestState; +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +if (coordinatorRequestState.canSendRequest(currentTimeMs)) { +NetworkClientDelegate.UnsentRequest request = makeFindCoordinatorRequest(currentTimeMs); +return new NetworkClientDelegate.PollResult(-1, Collections.singletonList(request)); +} + +return new NetworkClientDelegate.PollResult( +coordinatorRequestState.remainingBackoffMs(currentTimeMs), +new ArrayList<>()); +} + +private NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long currentTimeMs) { +coordinatorRequestState.updateLastSend(currentTimeMs); +FindCoordinatorRequestData data = new FindCoordinatorRequestData() +.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id()) +
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1038427640 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class CoordinatorRequestManager implements RequestManager { + +private final Logger log; +private final Time time; +private final long requestTimeoutMs; +private final ErrorEventHandler errorHandler; +private final long rebalanceTimeoutMs; +private final Optional groupId; + +private final CoordinatorRequestState coordinatorRequestState; +private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while +private Node coordinator; + + +public CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final Optional groupId, + final long rebalanceTimeoutMs) { +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); +this.coordinatorRequestState = new CoordinatorRequestState(config); +} + +CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ErrorEventHandler errorHandler, + final Optional groupId, + final long rebalanceTimeoutMs, + final long requestTimeoutMs, + final CoordinatorRequestState coordinatorRequestState) { +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.requestTimeoutMs = requestTimeoutMs; +this.coordinatorRequestState = coordinatorRequestState; +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +if (coordinatorRequestState.canSendRequest(currentTimeMs)) { +NetworkClientDelegate.UnsentRequest request = makeFindCoordinatorRequest(currentTimeMs); +return new NetworkClientDelegate.PollResult(-1, Collections.singletonList(request)); +} + +return new NetworkClientDelegate.PollResult( +coordinatorRequestState.remainingBackoffMs(currentTimeMs), +new ArrayList<>()); +} + +private NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long currentTimeMs) { +coordinatorRequestState.updateLastSend(currentTimeMs); +FindCoordinatorRequestData data = new FindCoordinatorRequestData() +.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id()) +
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1038423837 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { +private final KafkaClient client; +private final Time time; +private final Logger log; +private boolean wakeup = false; +private final Queue unsentRequests; + +public NetworkClientDelegate( +final Time time, +final LogContext logContext, +final KafkaClient client) { +this.time = time; +this.client = client; +this.log = logContext.logger(getClass()); +this.unsentRequests = new ArrayDeque<>(); +} + +public List poll(Timer timer, boolean disableWakeup) { +client.wakeup(); +if (!disableWakeup) { +// trigger wakeups after checking for disconnects so that the callbacks will be ready +// to be fired on the next call to poll() +maybeTriggerWakeup(); +} + +trySend(); +return this.client.poll(timer.timeoutMs(), time.milliseconds()); +} + +private void trySend() { +while (unsentRequests.size() > 0) { +UnsentRequest unsent = unsentRequests.poll(); +if (unsent.timer.isExpired()) { +// TODO: expired request should be marked +unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( +"Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); +continue; +} + +if (!doSend(unsent)) { +log.debug("No broker available to send the request: {}", unsent); +unsent.callback.ifPresent(v -> v.onFailure( +new IllegalThreadStateException("No node available in the kafka cluster to send the request"))); +} +} +} + +static boolean isReady(KafkaClient client, Node node, long currentTime) { +client.poll(0, currentTime); +return client.isReady(node, currentTime); +} + +public boolean doSend(UnsentRequest r) { +long now = time.milliseconds(); +Node node = r.node.orElse(client.leastLoadedNode(now)); +if (node == null) { +return false; +} +ClientRequest request = makeClientRequest(r, node); +// TODO: Sounds like we need to check disconnections for each node and complete the request with +// authentication error +if (isReady(client, node, now)) { +client.send(request, now); +} +return true; +} + +private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) { +return client.newClientRequest( +node.idString(), +unsent.abstractBuilder, +time.milliseconds(), +true, +// TODO: Determine if we want the actual request timeout here to be requestTimeoutMs - timeInUnsentQueue +(int)
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1038423520 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { +private final KafkaClient client; +private final Time time; +private final Logger log; +private boolean wakeup = false; +private final Queue unsentRequests; + +public NetworkClientDelegate( +final Time time, +final LogContext logContext, +final KafkaClient client) { +this.time = time; +this.client = client; +this.log = logContext.logger(getClass()); +this.unsentRequests = new ArrayDeque<>(); +} + +public List poll(Timer timer, boolean disableWakeup) { +client.wakeup(); +if (!disableWakeup) { +// trigger wakeups after checking for disconnects so that the callbacks will be ready +// to be fired on the next call to poll() +maybeTriggerWakeup(); +} + +trySend(); +return this.client.poll(timer.timeoutMs(), time.milliseconds()); +} + +private void trySend() { +while (unsentRequests.size() > 0) { +UnsentRequest unsent = unsentRequests.poll(); +if (unsent.timer.isExpired()) { +// TODO: expired request should be marked +unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( +"Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); +continue; +} + +if (!doSend(unsent)) { +log.debug("No broker available to send the request: {}", unsent); +unsent.callback.ifPresent(v -> v.onFailure( +new IllegalThreadStateException("No node available in the kafka cluster to send the request"))); +} +} +} + +static boolean isReady(KafkaClient client, Node node, long currentTime) { +client.poll(0, currentTime); +return client.isReady(node, currentTime); +} + +public boolean doSend(UnsentRequest r) { +long now = time.milliseconds(); +Node node = r.node.orElse(client.leastLoadedNode(now)); +if (node == null) { +return false; +} +ClientRequest request = makeClientRequest(r, node); +// TODO: Sounds like we need to check disconnections for each node and complete the request with +// authentication error +if (isReady(client, node, now)) { +client.send(request, now); +} +return true; +} + +private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) { +return client.newClientRequest( +node.idString(), +unsent.abstractBuilder, +time.milliseconds(), +true, +// TODO: Determine if we want the actual request timeout here to be requestTimeoutMs - timeInUnsentQueue +(int)
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1038423837 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { +private final KafkaClient client; +private final Time time; +private final Logger log; +private boolean wakeup = false; +private final Queue unsentRequests; + +public NetworkClientDelegate( +final Time time, +final LogContext logContext, +final KafkaClient client) { +this.time = time; +this.client = client; +this.log = logContext.logger(getClass()); +this.unsentRequests = new ArrayDeque<>(); +} + +public List poll(Timer timer, boolean disableWakeup) { +client.wakeup(); +if (!disableWakeup) { +// trigger wakeups after checking for disconnects so that the callbacks will be ready +// to be fired on the next call to poll() +maybeTriggerWakeup(); +} + +trySend(); +return this.client.poll(timer.timeoutMs(), time.milliseconds()); +} + +private void trySend() { +while (unsentRequests.size() > 0) { +UnsentRequest unsent = unsentRequests.poll(); +if (unsent.timer.isExpired()) { +// TODO: expired request should be marked +unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( +"Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); +continue; +} + +if (!doSend(unsent)) { +log.debug("No broker available to send the request: {}", unsent); +unsent.callback.ifPresent(v -> v.onFailure( +new IllegalThreadStateException("No node available in the kafka cluster to send the request"))); +} +} +} + +static boolean isReady(KafkaClient client, Node node, long currentTime) { +client.poll(0, currentTime); +return client.isReady(node, currentTime); +} + +public boolean doSend(UnsentRequest r) { +long now = time.milliseconds(); +Node node = r.node.orElse(client.leastLoadedNode(now)); +if (node == null) { +return false; +} +ClientRequest request = makeClientRequest(r, node); +// TODO: Sounds like we need to check disconnections for each node and complete the request with +// authentication error +if (isReady(client, node, now)) { +client.send(request, now); +} +return true; +} + +private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) { +return client.newClientRequest( +node.idString(), +unsent.abstractBuilder, +time.milliseconds(), +true, +// TODO: Determine if we want the actual request timeout here to be requestTimeoutMs - timeInUnsentQueue +(int)
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1038423520 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { +private final KafkaClient client; +private final Time time; +private final Logger log; +private boolean wakeup = false; +private final Queue unsentRequests; + +public NetworkClientDelegate( +final Time time, +final LogContext logContext, +final KafkaClient client) { +this.time = time; +this.client = client; +this.log = logContext.logger(getClass()); +this.unsentRequests = new ArrayDeque<>(); +} + +public List poll(Timer timer, boolean disableWakeup) { +client.wakeup(); +if (!disableWakeup) { +// trigger wakeups after checking for disconnects so that the callbacks will be ready +// to be fired on the next call to poll() +maybeTriggerWakeup(); +} + +trySend(); +return this.client.poll(timer.timeoutMs(), time.milliseconds()); +} + +private void trySend() { +while (unsentRequests.size() > 0) { +UnsentRequest unsent = unsentRequests.poll(); +if (unsent.timer.isExpired()) { +// TODO: expired request should be marked +unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( +"Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); +continue; +} + +if (!doSend(unsent)) { +log.debug("No broker available to send the request: {}", unsent); +unsent.callback.ifPresent(v -> v.onFailure( +new IllegalThreadStateException("No node available in the kafka cluster to send the request"))); +} +} +} + +static boolean isReady(KafkaClient client, Node node, long currentTime) { +client.poll(0, currentTime); +return client.isReady(node, currentTime); +} + +public boolean doSend(UnsentRequest r) { +long now = time.milliseconds(); +Node node = r.node.orElse(client.leastLoadedNode(now)); +if (node == null) { +return false; +} +ClientRequest request = makeClientRequest(r, node); +// TODO: Sounds like we need to check disconnections for each node and complete the request with +// authentication error +if (isReady(client, node, now)) { +client.send(request, now); +} +return true; +} + +private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) { +return client.newClientRequest( +node.idString(), +unsent.abstractBuilder, +time.milliseconds(), +true, +// TODO: Determine if we want the actual request timeout here to be requestTimeoutMs - timeInUnsentQueue +(int)
[GitHub] [kafka] belugabehr commented on pull request #8066: KAFKA-4090: Validate SSL connection in client
belugabehr commented on PR #8066: URL: https://github.com/apache/kafka/pull/8066#issuecomment-1335664538 I documented my work on the ticket as well. It got very little attention from the Kafka Dev Team and seems to be pretty out of date now (merge conflicts). Will need someone internal to the project to take this baton and run with it. https://issues.apache.org/jira/browse/KAFKA-4090 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1038422365 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { +private final KafkaClient client; +private final Time time; +private final Logger log; +private boolean wakeup = false; +private final Queue unsentRequests; + +public NetworkClientDelegate( +final Time time, +final LogContext logContext, +final KafkaClient client) { +this.time = time; +this.client = client; +this.log = logContext.logger(getClass()); +this.unsentRequests = new ArrayDeque<>(); +} + +public List poll(Timer timer, boolean disableWakeup) { +client.wakeup(); +if (!disableWakeup) { +// trigger wakeups after checking for disconnects so that the callbacks will be ready +// to be fired on the next call to poll() +maybeTriggerWakeup(); +} + +trySend(); +return this.client.poll(timer.timeoutMs(), time.milliseconds()); Review Comment: After we call `poll()`, we need to iterate through unsent requests and see if the node has been disconnected. Similar to `ConsumerNetworkClient.checkDisconnects`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1038423837 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { +private final KafkaClient client; +private final Time time; +private final Logger log; +private boolean wakeup = false; +private final Queue unsentRequests; + +public NetworkClientDelegate( +final Time time, +final LogContext logContext, +final KafkaClient client) { +this.time = time; +this.client = client; +this.log = logContext.logger(getClass()); +this.unsentRequests = new ArrayDeque<>(); +} + +public List poll(Timer timer, boolean disableWakeup) { +client.wakeup(); +if (!disableWakeup) { +// trigger wakeups after checking for disconnects so that the callbacks will be ready +// to be fired on the next call to poll() +maybeTriggerWakeup(); +} + +trySend(); +return this.client.poll(timer.timeoutMs(), time.milliseconds()); +} + +private void trySend() { +while (unsentRequests.size() > 0) { +UnsentRequest unsent = unsentRequests.poll(); +if (unsent.timer.isExpired()) { +// TODO: expired request should be marked +unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( +"Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); +continue; +} + +if (!doSend(unsent)) { +log.debug("No broker available to send the request: {}", unsent); +unsent.callback.ifPresent(v -> v.onFailure( +new IllegalThreadStateException("No node available in the kafka cluster to send the request"))); +} +} +} + +static boolean isReady(KafkaClient client, Node node, long currentTime) { +client.poll(0, currentTime); +return client.isReady(node, currentTime); +} + +public boolean doSend(UnsentRequest r) { +long now = time.milliseconds(); +Node node = r.node.orElse(client.leastLoadedNode(now)); +if (node == null) { +return false; +} +ClientRequest request = makeClientRequest(r, node); +// TODO: Sounds like we need to check disconnections for each node and complete the request with +// authentication error +if (isReady(client, node, now)) { +client.send(request, now); +} +return true; +} + +private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) { +return client.newClientRequest( +node.idString(), +unsent.abstractBuilder, +time.milliseconds(), +true, +// TODO: Determine if we want the actual request timeout here to be requestTimeoutMs - timeInUnsentQueue +(int)
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1038418696 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { +private final KafkaClient client; +private final Time time; +private final Logger log; +private boolean wakeup = false; +private final Queue unsentRequests; + +public NetworkClientDelegate( +final Time time, +final LogContext logContext, +final KafkaClient client) { +this.time = time; +this.client = client; +this.log = logContext.logger(getClass()); +this.unsentRequests = new ArrayDeque<>(); +} + +public List poll(Timer timer, boolean disableWakeup) { +client.wakeup(); +if (!disableWakeup) { +// trigger wakeups after checking for disconnects so that the callbacks will be ready +// to be fired on the next call to poll() +maybeTriggerWakeup(); +} + +trySend(); +return this.client.poll(timer.timeoutMs(), time.milliseconds()); +} + +private void trySend() { +while (unsentRequests.size() > 0) { +UnsentRequest unsent = unsentRequests.poll(); +if (unsent.timer.isExpired()) { +// TODO: expired request should be marked +unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( +"Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); +continue; +} + +if (!doSend(unsent)) { +log.debug("No broker available to send the request: {}", unsent); +unsent.callback.ifPresent(v -> v.onFailure( +new IllegalThreadStateException("No node available in the kafka cluster to send the request"))); Review Comment: Use retriable exception. Maybe `NETWORK_EXCEPTION`? Or a custom exception, `NoNodeAvailableException` or sth like that. An alternative is to add to the end of the queue? We don't need to back off. Or perhaps we can use a separate queue for requests that have no target node. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1038418696 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { +private final KafkaClient client; +private final Time time; +private final Logger log; +private boolean wakeup = false; +private final Queue unsentRequests; + +public NetworkClientDelegate( +final Time time, +final LogContext logContext, +final KafkaClient client) { +this.time = time; +this.client = client; +this.log = logContext.logger(getClass()); +this.unsentRequests = new ArrayDeque<>(); +} + +public List poll(Timer timer, boolean disableWakeup) { +client.wakeup(); +if (!disableWakeup) { +// trigger wakeups after checking for disconnects so that the callbacks will be ready +// to be fired on the next call to poll() +maybeTriggerWakeup(); +} + +trySend(); +return this.client.poll(timer.timeoutMs(), time.milliseconds()); +} + +private void trySend() { +while (unsentRequests.size() > 0) { +UnsentRequest unsent = unsentRequests.poll(); +if (unsent.timer.isExpired()) { +// TODO: expired request should be marked +unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( +"Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); +continue; +} + +if (!doSend(unsent)) { +log.debug("No broker available to send the request: {}", unsent); +unsent.callback.ifPresent(v -> v.onFailure( +new IllegalThreadStateException("No node available in the kafka cluster to send the request"))); Review Comment: Use retriable exception. Maybe `NETWORK_EXCEPTION`? An alternative is to add to the end of the queue? We don't need to back off. Or perhaps we can use a separate queue for requests that have no target node. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1038418696 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { +private final KafkaClient client; +private final Time time; +private final Logger log; +private boolean wakeup = false; +private final Queue unsentRequests; + +public NetworkClientDelegate( +final Time time, +final LogContext logContext, +final KafkaClient client) { +this.time = time; +this.client = client; +this.log = logContext.logger(getClass()); +this.unsentRequests = new ArrayDeque<>(); +} + +public List poll(Timer timer, boolean disableWakeup) { +client.wakeup(); +if (!disableWakeup) { +// trigger wakeups after checking for disconnects so that the callbacks will be ready +// to be fired on the next call to poll() +maybeTriggerWakeup(); +} + +trySend(); +return this.client.poll(timer.timeoutMs(), time.milliseconds()); +} + +private void trySend() { +while (unsentRequests.size() > 0) { +UnsentRequest unsent = unsentRequests.poll(); +if (unsent.timer.isExpired()) { +// TODO: expired request should be marked +unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( +"Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); +continue; +} + +if (!doSend(unsent)) { +log.debug("No broker available to send the request: {}", unsent); +unsent.callback.ifPresent(v -> v.onFailure( +new IllegalThreadStateException("No node available in the kafka cluster to send the request"))); Review Comment: Use retriable exception. Maybe `NETWORK_EXCEPTION`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14436) Initialize KRaft with arbitrary epoch
[ https://issues.apache.org/jira/browse/KAFKA-14436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alyssa Huang reassigned KAFKA-14436: Assignee: Alyssa Huang > Initialize KRaft with arbitrary epoch > - > > Key: KAFKA-14436 > URL: https://issues.apache.org/jira/browse/KAFKA-14436 > Project: Kafka > Issue Type: Sub-task >Reporter: David Arthur >Assignee: Alyssa Huang >Priority: Major > > For the ZK migration, we need to be able to initialize Raft with an > arbitrarily high epoch (within the size limit). This is because during the > migration, we want to write the Raft epoch as the controller epoch in ZK. We > require that epochs in /controller_epoch are monotonic in order for brokers > to behave normally. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
jolshan commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1038385341 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@ParameterizedTest +@ValueSource(ints = {3, 7, 14, 51}) +public void testRetriableErrors(int errorCode) { +// Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS, +// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION +// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic. +Errors error = Errors.forCode((short) errorCode); + +// Ensure FindCoordinator retries. +TransactionalRequestResult result = transactionManager.initializeTransactions(); +prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId); +prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); +runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); +assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + +// Ensure InitPid retries. +prepareInitPidResponse(error, false, producerId, epoch); +prepareInitPidResponse(Errors.NONE, false, producerId, epoch); +runUntil(transactionManager::hasProducerId); + +result.await(); +transactionManager.beginTransaction(); + +// Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute. +Errors addPartitionsToTxnError = errorCode == 51 ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error; +transactionManager.maybeAddPartition(tp0); +prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId); +prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); +runUntil(() -> transactionManager.transactionContainsPartition(tp0)); + +// Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit. + +// Ensure EndTxn retries. +TransactionalRequestResult abortResult = transactionManager.beginCommit(); +prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch); +prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch); +runUntil(abortResult::isCompleted); +assertTrue(abortResult.isSuccessful()); +} + +@Test +public void testCoordinatorNotAvailable() { Review Comment: Oops, I added it to that test. I will revert that change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1038378151 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class CoordinatorRequestManager implements RequestManager { + +private final Logger log; +private final Time time; +private final long requestTimeoutMs; +private final ErrorEventHandler errorHandler; +private final long rebalanceTimeoutMs; +private final Optional groupId; + +private final CoordinatorRequestState coordinatorRequestState; +private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while +private Node coordinator; + + +public CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final Optional groupId, + final long rebalanceTimeoutMs) { +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); +this.coordinatorRequestState = new CoordinatorRequestState(config); +} + +CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ErrorEventHandler errorHandler, + final Optional groupId, + final long rebalanceTimeoutMs, + final long requestTimeoutMs, + final CoordinatorRequestState coordinatorRequestState) { +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.requestTimeoutMs = requestTimeoutMs; +this.coordinatorRequestState = coordinatorRequestState; +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +if (coordinatorRequestState.canSendRequest(currentTimeMs)) { +NetworkClientDelegate.UnsentRequest request = makeFindCoordinatorRequest(currentTimeMs); +return new NetworkClientDelegate.PollResult(-1, Collections.singletonList(request)); +} + +return new NetworkClientDelegate.PollResult( +coordinatorRequestState.remainingBackoffMs(currentTimeMs), +new ArrayList<>()); +} + +private NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long currentTimeMs) { +coordinatorRequestState.updateLastSend(currentTimeMs); +FindCoordinatorRequestData data = new FindCoordinatorRequestData() +.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id()) +
[GitHub] [kafka] jolshan commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
jolshan commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1038368961 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@ParameterizedTest +@ValueSource(ints = {3, 7, 14, 51}) +public void testRetriableErrors(int errorCode) { +// Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS, +// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION +// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic. +Errors error = Errors.forCode((short) errorCode); + +// Ensure FindCoordinator retries. +TransactionalRequestResult result = transactionManager.initializeTransactions(); +prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId); +prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); +runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); +assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + +// Ensure InitPid retries. +prepareInitPidResponse(error, false, producerId, epoch); +prepareInitPidResponse(Errors.NONE, false, producerId, epoch); +runUntil(transactionManager::hasProducerId); + +result.await(); +transactionManager.beginTransaction(); + +// Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute. +Errors addPartitionsToTxnError = errorCode == 51 ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error; +transactionManager.maybeAddPartition(tp0); +prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId); +prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); +runUntil(() -> transactionManager.transactionContainsPartition(tp0)); + +// Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit. + +// Ensure EndTxn retries. +TransactionalRequestResult abortResult = transactionManager.beginCommit(); +prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch); +prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch); +runUntil(abortResult::isCompleted); +assertTrue(abortResult.isSuccessful()); +} + +@Test +public void testCoordinatorNotAvailable() { Review Comment: I suppose we have `testLookupCoordinatorOnNotCoordinatorError` Is this what you were thinking of? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
hachikuji commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1038367059 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@ParameterizedTest +@ValueSource(ints = {3, 7, 14, 51}) +public void testRetriableErrors(int errorCode) { +// Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS, +// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION +// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic. +Errors error = Errors.forCode((short) errorCode); + +// Ensure FindCoordinator retries. +TransactionalRequestResult result = transactionManager.initializeTransactions(); +prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId); +prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); +runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); +assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + +// Ensure InitPid retries. +prepareInitPidResponse(error, false, producerId, epoch); +prepareInitPidResponse(Errors.NONE, false, producerId, epoch); +runUntil(transactionManager::hasProducerId); + +result.await(); +transactionManager.beginTransaction(); + +// Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute. +Errors addPartitionsToTxnError = errorCode == 51 ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error; +transactionManager.maybeAddPartition(tp0); +prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId); +prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); +runUntil(() -> transactionManager.transactionContainsPartition(tp0)); + +// Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit. + +// Ensure EndTxn retries. +TransactionalRequestResult abortResult = transactionManager.beginCommit(); +prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch); +prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch); +runUntil(abortResult::isCompleted); +assertTrue(abortResult.isSuccessful()); +} + +@Test +public void testCoordinatorNotAvailable() { Review Comment: Sorry, I realized this is not a valid error for `FindCoordinator`, so nevermind. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
jolshan commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1038365461 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@ParameterizedTest +@ValueSource(ints = {3, 7, 14, 51}) Review Comment: I thought about this, but I didn't know how to make the errors from strings. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
hachikuji commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1038362105 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@ParameterizedTest +@ValueSource(ints = {3, 7, 14, 51}) Review Comment: I think another way to do this is like this: ```java @EnumSource(names = { "UNKNOWN_TOPIC_OR_PARTITION", "REQUEST_TIMED_OUT", "COORDINATOR_LOAD_IN_PROGRESS", "CONCURRENT_TRANSACTIONS" }) public void testRetriableErrors(Errors error) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
hachikuji commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1038362105 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@ParameterizedTest +@ValueSource(ints = {3, 7, 14, 51}) Review Comment: I think another way to do this is like this: ```java @EnumSource(names = { "UNKNOWN_TOPIC_OR_PARTITION", "REQUEST_TIMED_OUT", "COORDINATOR_LOAD_IN_PROGRESS", "CONCURRENT_TRANSACTIONS" }) ``` ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@ParameterizedTest +@ValueSource(ints = {3, 7, 14, 51}) +public void testRetriableErrors(int errorCode) { +// Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS, +// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION +// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic. +Errors error = Errors.forCode((short) errorCode); + +// Ensure FindCoordinator retries. +TransactionalRequestResult result = transactionManager.initializeTransactions(); +prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId); +prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); +runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); +assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + +// Ensure InitPid retries. +prepareInitPidResponse(error, false, producerId, epoch); +prepareInitPidResponse(Errors.NONE, false, producerId, epoch); +runUntil(transactionManager::hasProducerId); + +result.await(); +transactionManager.beginTransaction(); + +// Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute. +Errors addPartitionsToTxnError = errorCode == 51 ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error; +transactionManager.maybeAddPartition(tp0); +prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId); +prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); +runUntil(() -> transactionManager.transactionContainsPartition(tp0)); + +// Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit. + +// Ensure EndTxn retries. +TransactionalRequestResult abortResult = transactionManager.beginCommit(); +prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch); +prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch); +runUntil(abortResult::isCompleted); +assertTrue(abortResult.isSuccessful()); +} + +@Test +public void testCoordinatorNotAvailable() { +// Ensure FindCoordinator with COORDINATOR_NOT_AVAILABLE error retries. +TransactionalRequestResult result = transactionManager.initializeTransactions(); +prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, false, CoordinatorType.TRANSACTION, transactionalId); +prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); +runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); +assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + +prepareInitPidResponse(Errors.NONE, false, producerId, epoch); +runUntil(transactionManager::hasProducerId); + +result.await(); +} + Review Comment: nit: extra new line ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@ParameterizedTest +@ValueSource(ints = {3, 7, 14, 51}) +public void testRetriableErrors(int errorCode) { +// Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS, +// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION +// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic. +Errors error = Errors.forCode((short) errorCode); + +// Ensure FindCoordinator retries. +TransactionalRequestResult result = transactionManager.initializeTransactions(); +
[GitHub] [kafka] jonathan-albrecht-ibm commented on pull request #12343: MINOR: Update unit/integration tests to work with the IBM Semeru JDK
jonathan-albrecht-ibm commented on PR #12343: URL: https://github.com/apache/kafka/pull/12343#issuecomment-1335553986 Thanks @mimaison! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments
[ https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14437: - Description: Currently, in StripedReplicaPlacer we don’t take existing partition assignments into consideration when the place method is called. This means for new partitions added, they may get the same assignments as existing partitions. This differs from AdminUtils, which has some logic to try and shift where in the list of brokers we start making assignments from for new partitions added. For example, lets say we had the following {code:java} Rack 1: 0, 1, 2, 3 Rack 2: 4, 5, 6, 7 Rack 3: 8, 9, 10, 11 {code} CreateTopics might return the following assignment for two partitions: {code:java} P0: 6, 8, 2 P1: 9, 3, 7 {code} If the user then calls CreatePartitions increasing the partition count to 4, StripedReplicaPlacer does not take into account P0 and P1. It creates a random rack offset and a random broker offset. So it could easily create the same assignment for P3 and P4 that it created for P0 and P1. This is easily reproduced in a unit test. My suggestion is to enhance StripedReplicaPlacer to account for existing partition assignments. Intuitively, we’d like to make assignments for added partitions from “where we left off” when we were making the previous assignments. In practice, its not possible to know exactly what the state was during the previous partition assignments because, for example, brokers fencing state may have changed. But I do think we can make a best effort attempt to do so that is optimized for the common case where most brokers are unfenced. Note, all the changes suggested below only will affect StripedReplicaPlacer when place is called and there are existing partition assignments, which happens when its servicing CreatePartitions requests. If there are no existing partition assignments, which happens during CreateTopics, the logic is unchanged. First, we need to update ClusterDescriber to: {code:java} public interface ClusterDescriber { /** * Get an iterator through the usable brokers. */ Iterator usableBrokers(); List> replicasForTopicName(String topicName); } {code} The replicasForTopicName returns the existing partition assignments. This will enable StripedReplicaPlacer to know about existing partition assignments when they exist. When place is called, some initialization is done in both RackList and BrokerList. One thing that is initialized is the offset variable - this is a variable used in both RackList and BrokerList that determines where in the list of either racks or brokers respectively we should start from when making the next assignment. Currently, it is initialized to a random value, based off the size of the list. I suggest we add some logic during initialization that sets the offset for both RackList and BrokerList to a value based off the previous assignments. Consider again the following rack metadata and existing assignments: {code:java} Rack 1: 0, 1, 2, 3 Rack 2: 4, 5, 6, 7 Rack 3: 8, 9, 10, 11 P0: 6, 8, 2 P1: 9, 3, 7 {code} Lets imagine a user wants to create a new partition, P3. First, we need to determine which rack to start from for P3: this corresponds to the initial offset in RackList. We can look at the leader of P1 (not P0 because P1 is the “last” partition we made an assignment for) and see its on rack 3. So, the next rack we should start from should be rack 1. This means we set offset in RackList to 0, instead of a random value, during initialization. Second, we need to determine which broker to start from {_}per rack{_}: this corresponds to the initial offset in BrokerList. We can look at all the existing partition assignments, P0 and P1 in our example, and _per rack_ infer the last offset started from during previous assignments. For each rack, we do this by iterating through each partition, in reverse order because we care about the most recent starting position, and try to find the first broker in the assignment from the rack. This enables us to know where we last started from when making an assignment for that rack, which can be used to determine where to continue on from within that rack. So in our example, for rack 1 we can see the last broker we started from was broker 3 in P1: so the next broker we should choose for that rack should be 0 which means the initial offset is set to 0 in the BrokerList for rack 1 during initialization. For rack 2 we can see the last broker we started with was broker 7 in P1: so the next broker should be 4 which means the offset is 0 in the BrokerList for rack 2. For rack 3 we can see the last broker we started with was was broker 9 in P1: so the next broker should be 10 which means the offset is 2 in the BrokerList for rack 3. was: Currently, in StripedReplicaPlacer we don’t take existing partition assignments into consideration
[jira] [Updated] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments
[ https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14437: - Description: Currently, in StripedReplicaPlacer we don’t take existing partition assignments into consideration when the place method is called. This means for new partitions added, they may get the same assignments as existing partitions. This differs from AdminUtils, which has some logic to try and shift where in the list of brokers we start making assignments from for new partitions added. For example, lets say we had the following {code:java} Rack 1: 0, 1, 2, 3 Rack 2: 4, 5, 6, 7 Rack 3: 8, 9, 10, 11 {code} CreateTopics might return the following assignment for two partitions: {code:java} P0: 6, 8, 2 P1: 9, 3, 7 {code} If the user then calls CreatePartitions increasing the partition count to 4, StripedReplicaPlacer does not take into account P0 and P1. It creates a random rack offset and a random broker offset. So it could easily create the same assignment for P3 and P4 that it created for P0 and P1. This is easily reproduced in a unit test. My suggestion is to enhance StripedReplicaPlacer to account for existing partition assignments. Intuitively, we’d like to make assignments for added partitions from “where we left off” when we were making the previous assignments. In practice, its not possible to know exactly what the state was during the previous partition assignments because, for example, brokers fencing state may have changed. But I do think we can make a best effort attempt to do so that is optimized for the common case where most brokers are unfenced. Note, all the changes suggested below only will affect StripedReplicaPlacer when place is called and there are existing partition assignments, which happens when its servicing CreatePartitions requests. If there are no existing partition assignments, which happens during CreateTopics, the logic is unchanged. First, we need to update ClusterDescriber to: {code:java} public interface ClusterDescriber { /** * Get an iterator through the usable brokers. */ Iterator usableBrokers(); List> replicasForTopicName(String topicName); } {code} The replicasForTopicName returns the existing partition assignments. This will enable StripedReplicaPlacer to know about existing partition assignments when they exist. When place is called, some initialization is done in both RackList and BrokerList. One thing that is initialized is the offset variable - this is a variable used in both RackList and BrokerList that determines where in the list of either racks or brokers respectively we should start from when making the next assignment. Currently, it is initialized to a random value, based off the size of the list. I suggest we add some logic during initialization that sets the offset for both RackList and BrokerList to a value based off the previous assignments. Consider again the following rack metadata and existing assignments: {code:java} Rack 1: 0, 1, 2, 3 Rack 2: 4, 5, 6, 7 Rack 3: 8, 9, 10, 11 P0: 6, 8, 2 P1: 9, 3, 7 {code} Lets imagine a user wants to create a new partition, P3. First, we need to determine which rack to start from for P3: this corresponds to the initial offset in RackList. We can look at the leader of P1 (not P0 because P1 is the “last” partition we made an assignment for) and see its on rack 3. So, the next rack we should start from should be rack 1. This means we set offset in RackList to 0, instead of a random value, during initialization. Second, we need to determine which broker to start from {_}per rack{_}: this corresponds to the initial offset in BrokerList. We can look at all the existing partition assignments, P0 and P1 in our example, and _per rack_ infer the last offset started from during previous assignments. For each rack, we do this by iterating through each partition, in reverse order because we care about the most recent starting position, and try to find the first broker in the assignment from the rack. This enables us to know where we last started from when making an assignment for that rack, which can be used to determine where to continue on from within that rack. So in our example, for rack 1 we can see the last broker we started from was broker 3 in P1: so the next broker we should choose for that rack should be 0 which means the initial offset is set to 0 in the BrokerList for rack 1 during initialization. For rack 2 we can see the last broker we started with was broker 7 in P1: so the next broker should be 4 which means the offset is 0 in the BrokerList for rack 2. For rack 3 we can see the last broker we started with was broker 9 in P1: so the next broker should be 10 which means the offset is 2 in the BrokerList for rack 3. was: Currently, in StripedReplicaPlacer we don’t take existing partition assignments into consideration when
[jira] [Updated] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments
[ https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14437: - Description: Currently, in StripedReplicaPlacer we don’t take existing partition assignments into consideration when the place method is called. This means for new partitions added, they may get the same assignments as existing partitions. This differs from AdminUtils, which has some logic to try and shift where in the list of brokers we start making assignments from for new partitions added. For example, lets say we had the following {code:java} Rack 1: 0, 1, 2, 3 Rack 2: 4, 5, 6, 7 Rack 3: 8, 9, 10, 11 {code} CreateTopics might return the following assignment for two partitions: {code:java} P0: 6, 8, 2 P1: 9, 3, 7 {code} If the user then calls CreatePartitions increasing the partition count to 4, StripedReplicaPlacer does not take into account P0 and P1. It creates a random rack offset and a random broker offset. So it could easily create the same assignment for P3 and P4 that it created for P0 and P1. This is easily reproduced in a unit test. My suggestion is to enhance StripedReplicaPlacer to account for existing partition assignments. Intuitively, we’d like to make assignments for added partitions from “where we left off” when we were making the previous assignments. In practice, its not possible to know exactly what the state was during the previous partition assignments because, for example, brokers fencing state may have changed. But I do think we can make a best effort attempt to do so that is optimized for the common case where most brokers are unfenced. Note, all the changes suggested below only will affect StripedReplicaPlacer when place is called and there are existing partition assignments, which happens when its servicing CreatePartitions requests. If there are no existing partition assignments, which happens during CreateTopics, the logic is unchanged. First, we need to update ClusterDescriber to: {code:java} public interface ClusterDescriber { /** * Get an iterator through the usable brokers. */ Iterator usableBrokers(); List> replicasForTopicName(String topicName); } {code} The replicasForTopicName returns the existing partition assignments. This will enable StripedReplicaPlacer to know about existing partition assignments when they exist. When place is called, some initialization is done in both RackList and BrokerList. One thing that is initialized is the offset variable - this is a variable used in both RackList and BrokerList that determines where in the list of either racks or brokers respectively we should start from when making the next assignment. Currently, it is initialized to a random value, based off the size of the list. I suggest we add some logic during initialization that sets the offset for both RackList and BrokerList to a value based off the previous assignments. Consider again the following rack metadata and existing assignments: {code:java} Rack 1: 0, 1, 2, 3 Rack 2: 4, 5, 6, 7 Rack 3: 8, 9, 10, 11 P0: 6, 8, 2 P1: 9, 3, 7 {code} Lets imagine a user wants to create a new partition, P3. First, we need to determine which rack to start from for P3: this corresponds to the initial offset in RackList. We can look at the leader of P1 (not P0 because P1 is the “last” partition we made an assignment for) and see its on rack 3. So, the next rack we should start from should be rack 1. This means we set offset in RackList to 0, instead of a random value, during initialization. Second, we need to determine which broker to start from {_}per rack{_}: this corresponds to the initial offset in BrokerList. We can look at all the existing partition assignments, P0 and P1 in our example, and _per rack_ infer the last offset started from during previous assignments. For each rack, we do this by iterating through each partition, in reverse order because we care about the most recent starting position, and try to find the first broker in the assignment from the rack. This enables us to know where we last started from when making an assignment for that rack, which can be used to determine where to continue on from. So in our example, for rack 1 we can see the last broker we started from was broker 3 in P1: so the next broker we should choose for that rack should be 0 which means the initial offset is set to 0 in the BrokerList for rack 1 during initialization. For rack 2 we can see the last broker we started with was broker 7 in P1: so the next broker should be 4 which means the offset is 0 in the BrokerList for rack 2. For rack 3 we can see the last broker we started with was was broker 9 in P1: so the next broker should be 10 which means the offset is 2 in the BrokerList for rack 3. was: Currently, in StripedReplicaPlacer we don’t take existing partition assignments into consideration when the place
[jira] [Updated] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments
[ https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14437: - Description: Currently, in StripedReplicaPlacer we don’t take existing partition assignments into consideration when the place method is called. This means for new partitions added, they may get the same assignments as existing partitions. This differs from AdminUtils, which has some logic to try and shift where in the list of brokers we start making assignments from for new partitions added. For example, lets say we had the following {code:java} Rack 1: 0, 1, 2, 3 Rack 2: 4, 5, 6, 7 Rack 3: 8, 9, 10, 11 {code} CreateTopics might return the following assignment for two partitions: {code:java} P0: 6, 8, 2 P1: 9, 3, 7 {code} If the user then calls CreatePartitions increasing the partition count to 4, StripedReplicaPlacer does not take into account P0 and P1. It creates a random rack offset and a random broker offset. So it could easily create the same assignment for P3 and P4 that it created for P0 and P1. This is easily reproduced in a unit test. My suggestion is to enhance StripedReplicaPlacer to account for existing partition assignments. Intuitively, we’d like to make assignments for added partitions from “where we left off” when we were making the previous assignments. In practice, its not possible to know exactly what the state was during the previous partition assignments because, for example, brokers fencing state may have changed. But I do think we can make a best effort attempt to do so that is optimized for the common case where most brokers are unfenced. Note, all the changes suggested below only will affect StripedReplicaPlacer when place is called and there are existing partition assignments, which happens when its servicing CreatePartitions requests. If there are no existing partition assignments, which happens during CreateTopics, the logic is unchanged. First, we need to update ClusterDescriber to: {code:java} public interface ClusterDescriber { /** * Get an iterator through the usable brokers. */ Iterator usableBrokers(); List> replicasForTopicName(String topicName); } {code} The replicasForTopicName returns the existing partition assignments. This will enable StripedReplicaPlacer to know about existing partition assignments when they exist. When place is called, some initialization is done in both RackList and BrokerList. One thing that is initialized is the offset variable - this is a variable used in both RackList and BrokerList that determines where in the list of either racks or brokers respectively we should start from when making the next assignment. Currently, it is initialized to a random value, based off the size of the list. I suggest we add some logic during initialization that sets the offset for both RackList and BrokerList to a value based off the previous assignments. Consider again the following rack metadata and existing assignments: {code:java} Rack 1: 0, 1, 2, 3 Rack 2: 4, 5, 6, 7 Rack 3: 8, 9, 10, 11 P0: 6, 8, 2 P1: 9, 3, 7 {code} Lets imagine a user wants to create a new partition, P3. First, we need to determine which rack to start from for P3: this corresponds to the initial offset in RackList. We can look at the leader of P1 (not P0 because P1 is the “last” partition we made an assignment for) and see its on rack 3. So, the next rack we should start from should be rack 1. This means we set offset in RackList to 0, instead of a random value, during initialization. Second, we need to determine which broker to start from {_}per rack{_}: this corresponds to the initial offset in BrokerList. We can look at all the existing partition assignments, P0 and P1 in our example, and _per rack_ infer the last offset started from during previous assignments. For each rack, we do this by iterating through each partition, in reverse order because we care about the most recent starting position, and try to find the first broker in the assignment. This enables us to know where we last started from when making an assignment for that rack, which can be used to determine where to continue on from. So in our example, for rack 1 we can see the last broker we started from was broker 3 in P1: so the next broker we should choose for that rack should be 0 which means the initial offset is set to 0 in the BrokerList for rack 1 during initialization. For rack 2 we can see the last broker we started with was broker 7 in P1: so the next broker should be 4 which means the offset is 0 in the BrokerList for rack 2. For rack 3 we can see the last broker we started with was was broker 9 in P1: so the next broker should be 10 which means the offset is 2 in the BrokerList for rack 3. was: Currently, in StripedReplicaPlacer we don’t take existing partition assignments into consideration when the place method is
[jira] [Updated] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments
[ https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14437: - Description: Currently, in StripedReplicaPlacer we don’t take existing partition assignments into consideration when the place method is called. This means for new partitions added, they may get the same assignments as existing partitions. This differs from AdminUtils, which has some logic to try and shift where in the list of brokers we start making assignments from for new partitions added. For example, lets say we had the following {code:java} Rack 1: 0, 1, 2, 3 Rack 2: 4, 5, 6, 7 Rack 3: 8, 9, 10, 11 {code} CreateTopics might return the following assignment for two partitions: {code:java} P0: 6, 8, 2 P1: 9, 3, 7 {code} If the user then calls CreatePartitions increasing the partition count to 4, StripedReplicaPlacer does not take into account P0 and P1. It creates a random rack offset and a random broker offset. So it could easily create the same assignment for P3 and P4 that it created for P0 and P1. This is easily reproduced in a unit test. My suggestion is to enhance StripedReplicaPlacer to account for existing partition assignments. Intuitively, we’d like to make assignments for added partitions from “where we left off” when we were making the previous assignments. In practice, its not possible to know exactly what the state was during the previous partition assignments because, for example, brokers fencing state may have changed. But I do think we can make a best effort attempt to do so that is optimized for the common case where most brokers are unfenced. Note, all the changes suggested below only will affect StripedReplicaPlacer when place is called and there are existing partition assignments, which happens when its servicing CreatePartitions requests. If there are no existing partition assignments, which happens during CreateTopics, the logic is unchanged. First, we need to update ClusterDescriber to: {code:java} public interface ClusterDescriber { /** * Get an iterator through the usable brokers. */ Iterator usableBrokers(); List> replicasForTopicName(String topicName); } {code} The replicasForTopicName returns the existing partition assignments. This will enable StripedReplicaPlacer to know about existing partition assignments when they exist. When place is called, some initialization is done in both RackList and BrokerList. One thing that is initialized is the offset variable - this is a variable used in both RackList and BrokerList that determines where in the list of either racks or brokers respectively we should start from when making the next assignment. Currently, it is initialized to a random value, based off the size of the list. I suggest we add some logic during initialization that sets the offset for both RackList and BrokerList to a value based off the previous assignments. Consider again the following rack metadata and existing assignments: {code:java} Rack 1: 0, 1, 2, 3 Rack 2: 4, 5, 6, 7 Rack 3: 8, 9, 10, 11 P0: 6, 8, 2 P1: 9, 3, 7 {code} Lets imagine a user wants to create a new partition, P3. First, we need to determine which rack to start from for P3: this corresponds to the initial offset in RackList. We can look at the leader of P1 (not P0 because P1 is the “last” partition we made an assignment for) and see its on rack 3. So, the next rack we should start from should be rack 1. This means we set offset in RackList to 0, instead of a random value, during initialization. Second, we need to determine which broker to start from {_}per rack{_}: this corresponds to the initial offset in BrokerList. We can look at all the existing partition assignments, P0 and P1 in our example, and _per rack_ infer the last offset started from during previous assignments. For each rack, we do this by iterating through each partition, in reverse order because we care about the most recent starting position, and try to find the first broker in the assignment. This enables us to know where we last started from when making an assignment for that rack, which can be used to determine where to continue on from. So in our example, for rack 1 we can see the last broker we started from was broker 3 in P1: so the next broker we should choose for that rack should be 0 which means the initial offset is set to 0 in the BrokerList for rack 1 during initialization. For rack 2 we can see the last broker we started with was broker 7 in P1: so the next broker should be 4 which means the offset is 0 in the BrokerList for rack 2. For rack 3 we can see the last broker we started with was was broker 9 in P1: so the next broker should be 10 which means the offset is 2 in the BrokerList for rack 3. was: Currently, in StripedReplicaPlacer we don’t take existing partition assignments into consideration when the place method is called.
[jira] [Commented] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments
[ https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642602#comment-17642602 ] Andrew Grant commented on KAFKA-14437: -- I created a draft PR, [https://github.com/apache/kafka/pull/12943,] that illustrates the idea. > Enhance StripedReplicaPlacer to account for existing partition assignments > -- > > Key: KAFKA-14437 > URL: https://issues.apache.org/jira/browse/KAFKA-14437 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Grant >Priority: Major > > Currently, in StripedReplicaPlacer we don’t take existing partition > assignments into consideration when the place method is called. This means > for new partitions added, they may get the same assignments as existing > partitions. This differs from AdminUtils, which has some logic to try and > shift where in the list of brokers we start making assignments from for new > partitions added. > For example, lets say we had the following > > {code:java} > Rack 1: 0, 1, 2, 3 > Rack 2: 4, 5, 6, 7 > Rack 3: 8, 9, 10, 11 > {code} > CreateTopics might return the following assignment for two partitions: > > {code:java} > P0: 6, 8, 2 > P1: 9, 3, 7 > {code} > If the user then calls CreatePartitions increasing the partition count to 4, > StripedReplicaPlacer does not take into account P0 and P1. It creates a > random rack offset and a random broker offset. So it could easily create the > same assignment for P3 and P4 that it created for P0 and P1. This is easily > reproduced in a unit test. > > My suggestion is to enhance StripedReplicaPlacer to account for existing > partition assignments. Intuitively, we’d like to make assignments for added > partitions from “where we left off” when we were making the previous > assignments. In practice, its not possible to know exactly what the state was > during the previous partition assignments because, for example, brokers > fencing state may have changed. But I do think we can make a best effort > attempt to do so that is optimized for the common case where most brokers are > unfenced. Note, all the changes suggested below only will affect > StripedReplicaPlacer when place is called and there are existing partition > assignments, which happens when its servicing CreatePartitions requests. If > there are no existing partition assignments, which happens during > CreateTopics, the logic is unchanged. > > First, we need to update ClusterDescriber to: > > > {code:java} > public interface ClusterDescriber { > /** > * Get an iterator through the usable brokers. > */ > Iterator usableBrokers(); > List> replicasForTopicName(String topicName); > } > {code} > > > The replicasForTopicName returns the existing partition assignments. This > will enable StripedReplicaPlacer to know about existing partition assignments > when they exist. > When place is called, some initialization is done in both RackList and > BrokerList. One thing that is initialized is the offset variable - this is a > variable used in both RackList and BrokerList that determines where in the > list of either racks or brokers respectively we should start from when making > the next assignment. Currently, it is initialized to a random value, based > off the size of the list. > I suggest we add some logic during initialization that sets the offset for > both RackList and BrokerList to a value based off the previous assignments. > Consider again the following rack metadata and existing assignments: > > {code:java} > Rack 1: 0, 1, 2, 3 > Rack 2: 4, 5, 6, 7 > Rack 3: 8, 9, 10, 11 > > P0: 6, 8, 2 > P1: 9, 3, 7 > {code} > > Lets imagine a user wants to create a new partition, called P3. > First, we need to determine which rack to start from for P3: this corresponds > to the initial offset in RackList. We can look at the leader of P1 (not P0 > because P1 is the “last” partition we made an assignment for) and see its on > rack 3. So, the next rack we should start from should be rack 1. This means > we set offset in RackList to 0, instead of a random value, during > initialization. > Second, we need to determine which broker to start from {_}per rack{_}: this > corresponds to the initial offset in BrokerList. We can look at all the > existing partition assignments, P0 and P1 in our example, and _per rack_ > infer the last offset started from during previous assignments. For each > rack, we do this by iterating through each partition, in reverse order > because we care about the most recent starting position, and try to find the > first broker in the assignment. This enables us to know where we last started > from when making an assignment for that rack, which can be used to determine > where to continue on from. > So in our
[GitHub] [kafka] andymg3 opened a new pull request, #12943: KAFKA-14437: Enhance StripedReplicaPlacer to account for existing partition assignments
andymg3 opened a new pull request, #12943: URL: https://github.com/apache/kafka/pull/12943 ### Details This is a draft PR showing a potential solution to https://issues.apache.org/jira/browse/KAFKA-14437. The idea is to enhance StripedReplicaPlacer to use previous partition assignments to set the initial RackList and BrokerList state such that we dont create the same assignments for new partitions. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments
Andrew Grant created KAFKA-14437: Summary: Enhance StripedReplicaPlacer to account for existing partition assignments Key: KAFKA-14437 URL: https://issues.apache.org/jira/browse/KAFKA-14437 Project: Kafka Issue Type: Improvement Reporter: Andrew Grant Currently, in StripedReplicaPlacer we don’t take existing partition assignments into consideration when the place method is called. This means for new partitions added, they may get the same assignments as existing partitions. This differs from AdminUtils, which has some logic to try and shift where in the list of brokers we start making assignments from for new partitions added. For example, lets say we had the following {code:java} Rack 1: 0, 1, 2, 3 Rack 2: 4, 5, 6, 7 Rack 3: 8, 9, 10, 11 {code} CreateTopics might return the following assignment for two partitions: {code:java} P0: 6, 8, 2 P1: 9, 3, 7 {code} If the user then calls CreatePartitions increasing the partition count to 4, StripedReplicaPlacer does not take into account P0 and P1. It creates a random rack offset and a random broker offset. So it could easily create the same assignment for P3 and P4 that it created for P0 and P1. This is easily reproduced in a unit test. My suggestion is to enhance StripedReplicaPlacer to account for existing partition assignments. Intuitively, we’d like to make assignments for added partitions from “where we left off” when we were making the previous assignments. In practice, its not possible to know exactly what the state was during the previous partition assignments because, for example, brokers fencing state may have changed. But I do think we can make a best effort attempt to do so that is optimized for the common case where most brokers are unfenced. Note, all the changes suggested below only will affect StripedReplicaPlacer when place is called and there are existing partition assignments, which happens when its servicing CreatePartitions requests. If there are no existing partition assignments, which happens during CreateTopics, the logic is unchanged. First, we need to update ClusterDescriber to: {code:java} public interface ClusterDescriber { /** * Get an iterator through the usable brokers. */ Iterator usableBrokers(); List> replicasForTopicName(String topicName); } {code} The replicasForTopicName returns the existing partition assignments. This will enable StripedReplicaPlacer to know about existing partition assignments when they exist. When place is called, some initialization is done in both RackList and BrokerList. One thing that is initialized is the offset variable - this is a variable used in both RackList and BrokerList that determines where in the list of either racks or brokers respectively we should start from when making the next assignment. Currently, it is initialized to a random value, based off the size of the list. I suggest we add some logic during initialization that sets the offset for both RackList and BrokerList to a value based off the previous assignments. Consider again the following rack metadata and existing assignments: {code:java} Rack 1: 0, 1, 2, 3 Rack 2: 4, 5, 6, 7 Rack 3: 8, 9, 10, 11 P0: 6, 8, 2 P1: 9, 3, 7 {code} Lets imagine a user wants to create a new partition, called P3. First, we need to determine which rack to start from for P3: this corresponds to the initial offset in RackList. We can look at the leader of P1 (not P0 because P1 is the “last” partition we made an assignment for) and see its on rack 3. So, the next rack we should start from should be rack 1. This means we set offset in RackList to 0, instead of a random value, during initialization. Second, we need to determine which broker to start from {_}per rack{_}: this corresponds to the initial offset in BrokerList. We can look at all the existing partition assignments, P0 and P1 in our example, and _per rack_ infer the last offset started from during previous assignments. For each rack, we do this by iterating through each partition, in reverse order because we care about the most recent starting position, and try to find the first broker in the assignment. This enables us to know where we last started from when making an assignment for that rack, which can be used to determine where to continue on from. So in our example, for rack 1 we can see the last broker we started from was broker 3 in P1: so the next broker we should choose for that rack should be 0 which means the initial offset is set to 0 in the BrokerList for rack 1 during initialization. For rack 2 we can see the last broker we started with was broker 7 in P1: so the next broker should be 4 which means the offset is 0 in the BrokerList for rack 2. For rack 3 we can see the last broker we started with was was broker 9 in P1: so the next broker should be 10
[jira] [Resolved] (KAFKA-14358) Users should not be able to create a regular topic name __cluster_metadata
[ https://issues.apache.org/jira/browse/KAFKA-14358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio resolved KAFKA-14358. Resolution: Fixed > Users should not be able to create a regular topic name __cluster_metadata > -- > > Key: KAFKA-14358 > URL: https://issues.apache.org/jira/browse/KAFKA-14358 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Blocker > Fix For: 3.4.0, 3.3.2 > > > The following test passes and it should not: > {code:java} > $ git diff > diff --git > a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > index 57834234cc..14b1435d00 100644 > --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > @@ -102,6 +102,12 @@ class CreateTopicsRequestTest extends > AbstractCreateTopicsRequestTest { > validateTopicExists("partial-none") > } > > + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) > + @ValueSource(strings = Array("zk", "kraft")) > + def testClusterMetadataTopicFails(quorum: String): Unit = { > + createTopic("__cluster_metadata", 1, 1) > + } > + > @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) > @ValueSource(strings = Array("zk")) > def testCreateTopicsWithVeryShortTimeouts(quorum: String): Unit = {{code} > Result of this test: > {code:java} > $ ./gradlew core:test --tests > CreateTopicsRequestTest.testClusterMetadataTopicFails > > Configure project : > Starting build with version 3.4.0-SNAPSHOT (commit id bc780c7c) using Gradle > 7.5.1, Java 1.8 and Scala 2.13.8 > Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0 > > Task :core:test > Gradle Test Run :core:test > Gradle Test Executor 8 > CreateTopicsRequestTest > > testClusterMetadataTopicFails(String) > > kafka.server.CreateTopicsRequestTest.testClusterMetadataTopicFails(String)[1] > PASSED > Gradle Test Run :core:test > Gradle Test Executor 8 > CreateTopicsRequestTest > > testClusterMetadataTopicFails(String) > > kafka.server.CreateTopicsRequestTest.testClusterMetadataTopicFails(String)[2] > PASSED > BUILD SUCCESSFUL in 44s > 44 actionable tasks: 3 executed, 41 up-to-date > {code} > I think that this test should fail in both KRaft and ZK. We want this to fail > in ZK so that it can be migrated to KRaft. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac merged pull request #12847: KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator` interface
dajac merged PR #12847: URL: https://github.com/apache/kafka/pull/12847 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zigarn commented on a diff in pull request #12175: KAFKA-14146: Config file option for MessageReader/MessageFormatter in ConsoleProducer/ConsoleConsumer (KIP-840)
zigarn commented on code in PR #12175: URL: https://github.com/apache/kafka/pull/12175#discussion_r1038293918 ## core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala: ## @@ -488,6 +488,31 @@ class ConsoleConsumerTest { assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey) } + @Test + def testCustomConfigShouldBePassedToConfigureMethod(): Unit = { +val propsFile = TestUtils.tempFile() +val propsStream = Files.newOutputStream(propsFile.toPath) +propsStream.write("key.deserializer.my-props=abc\n".getBytes()) +propsStream.write("print.key=false".getBytes()) +propsStream.close() Review Comment: I'll propose another PR soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zigarn commented on pull request #12175: KAFKA-14146: Config file option for MessageReader/MessageFormatter in ConsoleProducer/ConsoleConsumer (KIP-840)
zigarn commented on PR #12175: URL: https://github.com/apache/kafka/pull/12175#issuecomment-1335472177 No problem, thanks for the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #12175: KAFKA-14146: Config file option for MessageReader/MessageFormatter in ConsoleProducer/ConsoleConsumer (KIP-840)
dajac merged PR #12175: URL: https://github.com/apache/kafka/pull/12175 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy merged pull request #12896: KAFKA-14398: Update EndToEndAuthorizationTest to test both ZK and KRAFT quorum servers
omkreddy merged PR #12896: URL: https://github.com/apache/kafka/pull/12896 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12175: KAFKA-14146: Config file option for MessageReader/MessageFormatter in ConsoleProducer/ConsoleConsumer (KIP-840)
dajac commented on code in PR #12175: URL: https://github.com/apache/kafka/pull/12175#discussion_r1038279577 ## core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala: ## @@ -488,6 +488,31 @@ class ConsoleConsumerTest { assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey) } + @Test + def testCustomConfigShouldBePassedToConfigureMethod(): Unit = { +val propsFile = TestUtils.tempFile() +val propsStream = Files.newOutputStream(propsFile.toPath) +propsStream.write("key.deserializer.my-props=abc\n".getBytes()) +propsStream.write("print.key=false".getBytes()) +propsStream.close() Review Comment: I would rather prefer to have something like `TestUtils.tempPropertyFile(Map("parse.key" -> "true", "key.separator" -> "|")`. However, we can do this in a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14436) Initialize KRaft with arbitrary epoch
David Arthur created KAFKA-14436: Summary: Initialize KRaft with arbitrary epoch Key: KAFKA-14436 URL: https://issues.apache.org/jira/browse/KAFKA-14436 Project: Kafka Issue Type: Sub-task Reporter: David Arthur For the ZK migration, we need to be able to initialize Raft with an arbitrarily high epoch (within the size limit). This is because during the migration, we want to write the Raft epoch as the controller epoch in ZK. We require that epochs in /controller_epoch are monotonic in order for brokers to behave normally. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14429) Move OffsetStorageReader from storage package to source package
[ https://issues.apache.org/jira/browse/KAFKA-14429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642565#comment-17642565 ] Chris Egerton commented on KAFKA-14429: --- Although I agree that the package name isn't ideal and there are better options, the cost of a migration to a new package name for a class that's been part of the public API for several years now might be too high to improve things at this point. > Move OffsetStorageReader from storage package to source package > --- > > Key: KAFKA-14429 > URL: https://issues.apache.org/jira/browse/KAFKA-14429 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Greg Harris >Priority: Minor > Labels: needs-kip > > The OffsetStorageReader is an interface provided to source connectors. This > does not fit with the broader context of the `storage` package, which is > focused on sink/source-agnostic converters and serialization/deserialization. > The current interface should be deprecated and extend from the relocated > interface in a different package. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14429) Move OffsetStorageReader from storage package to source package
[ https://issues.apache.org/jira/browse/KAFKA-14429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14429: -- Labels: needs-kip (was: ) > Move OffsetStorageReader from storage package to source package > --- > > Key: KAFKA-14429 > URL: https://issues.apache.org/jira/browse/KAFKA-14429 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Greg Harris >Priority: Minor > Labels: needs-kip > > The OffsetStorageReader is an interface provided to source connectors. This > does not fit with the broader context of the `storage` package, which is > focused on sink/source-agnostic converters and serialization/deserialization. > The current interface should be deprecated and extend from the relocated > interface in a different package. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14427) Add support for ZK migration multi-ops transaction
[ https://issues.apache.org/jira/browse/KAFKA-14427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-14427: - Summary: Add support for ZK migration multi-ops transaction (was: Add support for ZK migration transactions) > Add support for ZK migration multi-ops transaction > -- > > Key: KAFKA-14427 > URL: https://issues.apache.org/jira/browse/KAFKA-14427 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14433) Clear all yammer metrics when test harnesses clean up
[ https://issues.apache.org/jira/browse/KAFKA-14433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-14433. -- Resolution: Fixed > Clear all yammer metrics when test harnesses clean up > - > > Key: KAFKA-14433 > URL: https://issues.apache.org/jira/browse/KAFKA-14433 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: David Arthur >Priority: Major > Attachments: image-2022-12-01-13-53-57-886.png, > image-2022-12-01-13-55-35-488.png > > > We should clear all yammer metrics from the yammer singleton when the > integration test harnesses clean up. This would avoid memory leaks in tests > that have a lot of test cases. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14433) Clear all yammer metrics when test harnesses clean up
[ https://issues.apache.org/jira/browse/KAFKA-14433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur reassigned KAFKA-14433: Assignee: David Arthur > Clear all yammer metrics when test harnesses clean up > - > > Key: KAFKA-14433 > URL: https://issues.apache.org/jira/browse/KAFKA-14433 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: David Arthur >Priority: Major > Attachments: image-2022-12-01-13-53-57-886.png, > image-2022-12-01-13-55-35-488.png > > > We should clear all yammer metrics from the yammer singleton when the > integration test harnesses clean up. This would avoid memory leaks in tests > that have a lot of test cases. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah merged pull request #12942: KAFKA-14433 Clear Yammer metrics in QuorumTestHarness#tearDown
mumrah merged PR #12942: URL: https://github.com/apache/kafka/pull/12942 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12914: KAFKA-14352: Rack-aware consumer partition assignment (KIP-881)
dajac commented on code in PR #12914: URL: https://github.com/apache/kafka/pull/12914#discussion_r1037881208 ## clients/src/main/resources/common/message/ConsumerProtocolAssignment.json: ## @@ -23,6 +23,7 @@ // that new versions cannot remove or reorder any of the existing fields. // // Version 2 is to support a new field "GenerationId" in ConsumerProtocolSubscription. + // Version 3 adds rack id to ConsumerProtocolSubscription. "validVersions": "0-2", Review Comment: We need to bump the version here. ## clients/src/main/resources/common/message/ConsumerProtocolSubscription.json: ## @@ -24,6 +24,7 @@ // Version 1 added the "OwnedPartitions" field to allow assigner know what partitions each member owned // Version 2 added a new field "GenerationId" to indicate if the member has out-of-date ownedPartitions. + // Version 3 adds rack id to enable rack-aware assignment. "validVersions": "0-2", Review Comment: We need to bump the version here as well. ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +88,145 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { +Subscription subscription = subscriptionEntry.getValue(); +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +Map topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.collect(Collectors.toMap(Map.Entry::getKey, e -> new TopicAssignmentState(e.getValue(), consumersPerTopic.get(e.getKey(); Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { +for (Map.Entry topicEntry : topicAssignmentStates.entrySet()) { Review Comment: I have a general question here. One of the key promise of the range assignor is that it co-partitions partitions. If you have two topics (foo and bar) with three partitions each and two consumers, the first consumer will get foo-0, bar-0, foo-1, bar-1 and the second one will get foo-2, bar-2. I am trying to understand if we still maintain this property with the rack aware assignment. Do we? I suppose that we do when all the partitions have all the same racks. I am not sure about the case where for instance foo would have no rack or only a subset of the racks (e.g. brokers are offline). ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -268,6 +304,47 @@ private Map> constrainedAssign(Map Collections.sort(unfilledMembersWithUnderMinQuotaPartitions); Collections.sort(unfilledMembersWithExactlyMinQuotaPartitions); +unassignedPartitions = rackInfo.sortPartitionsByRackConsumers(unassignedPartitions); + +// Round-Robin filling within racks for remaining members up to the expected numbers of maxQuota, +// otherwise, to minQuota +int nextUnfilledConsumerIndex = 0; +Iterator unassignedIter = unassignedPartitions.iterator(); +while (!rackInfo.consumerRacks.isEmpty() && unassignedIter.hasNext()) { +TopicPartition unassignedPartition = unassignedIter.next(); +String consumer = null; +int nextIndex = rackInfo.nextRackConsumer(unassignedPartition, unfilledMembersWithUnderMinQuotaPartitions, nextUnfilledConsumerIndex); +if (nextIndex >= 0) { +consumer = unfilledMembersWithUnderMinQuotaPartitions.get(nextIndex); +int assignmentCount = assignment.get(consumer).size() + 1; +if (assignmentCount >= minQuota) { + unfilledMembersWithUnderMinQuotaPartitions.remove(consumer); +if (assignmentCount < maxQuota) + unfilledMembersWithExactlyMinQuotaPartitions.add(consumer); +} else +nextIndex++; Review
[GitHub] [kafka] pprovenzano commented on pull request #12896: KAFKA-14398: Update EndToEndAuthorizationTest to test both ZK and KRAFT quorum servers
pprovenzano commented on PR #12896: URL: https://github.com/apache/kafka/pull/12896#issuecomment-1335323936 I have validated that the failing tests work by manually running them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled
[ https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Purshotam Chauhan updated KAFKA-14435: -- Affects Version/s: 3.2.3 3.2.2 3.2.1 3.2.0 > Kraft: StandardAuthorizer allowing a non-authorized user when > `allow.everyone.if.no.acl.found` is enabled > - > > Key: KAFKA-14435 > URL: https://issues.apache.org/jira/browse/KAFKA-14435 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.2.0, 3.2.1, 3.2.2, 3.2.3 >Reporter: Purshotam Chauhan >Assignee: Purshotam Chauhan >Priority: Critical > > When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow > everyone only if there is no ACL present for a particular resource. But if > there are ACL present for the resource, then it shouldn't be allowing > everyone. > StandardAuthorizer is allowing the principals for which no ACLs are defined > even when the resource has other ACLs. > > This behavior can be validated with the following test case: > > {code:java} > @Test > public void testAllowEveryoneConfig() throws Exception { > StandardAuthorizer authorizer = new StandardAuthorizer(); > HashMap configs = new HashMap<>(); > configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris"); > configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true"); > authorizer.configure(configs); > authorizer.start(new > AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT))); > authorizer.completeInitialLoad(); > // Allow User:Alice to read topic "foobar" > List acls = asList( > withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", > WILDCARD, READ, ALLOW)) > ); > acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl())); > // User:Bob shouldn't be allowed to read topic "foobar" > assertEquals(singletonList(DENIED), > authorizer.authorize(new MockAuthorizableRequestContext.Builder(). > setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(), > singletonList(newAction(READ, TOPIC, "foobar"; > } > {code} > > In the above test, `User:Bob` should be DENIED but the above test case fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled
[ https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642497#comment-17642497 ] Purshotam Chauhan edited comment on KAFKA-14435 at 12/2/22 2:12 PM: This can be fixed by adding a flag `noResourceAcls` in `MatchingAclBuilder` class. We can set this flag inside the `if` block [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L523]. was (Author: JIRAUSER298490): This can be fixed by adding a flag `noResourceAcls` in `MatchingAclBuilder` class. We can set this flag inside the `if` clause [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L523]. > Kraft: StandardAuthorizer allowing a non-authorized user when > `allow.everyone.if.no.acl.found` is enabled > - > > Key: KAFKA-14435 > URL: https://issues.apache.org/jira/browse/KAFKA-14435 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: Purshotam Chauhan >Priority: Critical > > When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow > everyone only if there is no ACL present for a particular resource. But if > there are ACL present for the resource, then it shouldn't be allowing > everyone. > StandardAuthorizer is allowing the principals for which no ACLs are defined > even when the resource has other ACLs. > > This behavior can be validated with the following test case: > > {code:java} > @Test > public void testAllowEveryoneConfig() throws Exception { > StandardAuthorizer authorizer = new StandardAuthorizer(); > HashMap configs = new HashMap<>(); > configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris"); > configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true"); > authorizer.configure(configs); > authorizer.start(new > AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT))); > authorizer.completeInitialLoad(); > // Allow User:Alice to read topic "foobar" > List acls = asList( > withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", > WILDCARD, READ, ALLOW)) > ); > acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl())); > // User:Bob shouldn't be allowed to read topic "foobar" > assertEquals(singletonList(DENIED), > authorizer.authorize(new MockAuthorizableRequestContext.Builder(). > setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(), > singletonList(newAction(READ, TOPIC, "foobar"; > } > {code} > > In the above test, `User:Bob` should be DENIED but the above test case fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled
[ https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Purshotam Chauhan reassigned KAFKA-14435: - Assignee: Purshotam Chauhan > Kraft: StandardAuthorizer allowing a non-authorized user when > `allow.everyone.if.no.acl.found` is enabled > - > > Key: KAFKA-14435 > URL: https://issues.apache.org/jira/browse/KAFKA-14435 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: Purshotam Chauhan >Assignee: Purshotam Chauhan >Priority: Critical > > When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow > everyone only if there is no ACL present for a particular resource. But if > there are ACL present for the resource, then it shouldn't be allowing > everyone. > StandardAuthorizer is allowing the principals for which no ACLs are defined > even when the resource has other ACLs. > > This behavior can be validated with the following test case: > > {code:java} > @Test > public void testAllowEveryoneConfig() throws Exception { > StandardAuthorizer authorizer = new StandardAuthorizer(); > HashMap configs = new HashMap<>(); > configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris"); > configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true"); > authorizer.configure(configs); > authorizer.start(new > AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT))); > authorizer.completeInitialLoad(); > // Allow User:Alice to read topic "foobar" > List acls = asList( > withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", > WILDCARD, READ, ALLOW)) > ); > acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl())); > // User:Bob shouldn't be allowed to read topic "foobar" > assertEquals(singletonList(DENIED), > authorizer.authorize(new MockAuthorizableRequestContext.Builder(). > setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(), > singletonList(newAction(READ, TOPIC, "foobar"; > } > {code} > > In the above test, `User:Bob` should be DENIED but the above test case fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lucasbru commented on pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory
lucasbru commented on PR #12935: URL: https://github.com/apache/kafka/pull/12935#issuecomment-1335257553 @cadonna Could you have a look? I ran this for a few hours in the soak and the native memory is stable. So I think this was the main leak -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12940: MINOR: Remove lock contention while adding sensors
divijvaidya commented on PR #12940: URL: https://github.com/apache/kafka/pull/12940#issuecomment-1335252064 @mimaison please take a look when you get a chance! Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12940: MINOR: Remove lock contention while adding sensors
divijvaidya commented on PR #12940: URL: https://github.com/apache/kafka/pull/12940#issuecomment-1335251627 Rebased from trunk. Test failures are unrelated. ``` Build / JDK 11 and Scala 2.13 / testFailureToFenceEpoch(String).quorum=kraft – kafka.api.TransactionsTest Build / JDK 11 and Scala 2.13 / [1] true – org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest Build / JDK 17 and Scala 2.13 / testSecondaryRefreshAfterElapsedDelay() – org.apache.kafka.common.security.oauthbearer.internals.secured.RefreshingHttpsJwksTest Build / JDK 17 and Scala 2.13 / [3] Type=Raft-CoReside, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.3-IV3, Security=PLAINTEXT – kafka.admin.MetadataQuorumCommandTest Build / JDK 17 and Scala 2.13 / testNoConsumeWithoutDescribeAclViaSubscribe() – kafka.api.SaslGssapiSslEndToEndAuthorizationTest Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest Build / JDK 8 and Scala 2.12 / testWakeupAfterSyncGroupReceivedExternalCompletion() – org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest Build / JDK 8 and Scala 2.12 / shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once] – org.apache.kafka.streams.integration.EosIntegrationTest ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642502#comment-17642502 ] Lucas Brutschy commented on KAFKA-12679: I referenced this ticket in the PR > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Priority: Major > Fix For: 3.4.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled
[ https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642497#comment-17642497 ] Purshotam Chauhan edited comment on KAFKA-14435 at 12/2/22 1:21 PM: This can be fixed by adding a flag `noResourceAcls` in `MatchingAclBuilder` class. We can set this flag inside the `if` clause [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L523]. was (Author: JIRAUSER298490): This can be fixed by adding a flag `noResourceAcls` in `MatchingAclBuilder` class. We can set this flag inside the `if` clause [here.|#L523].] > Kraft: StandardAuthorizer allowing a non-authorized user when > `allow.everyone.if.no.acl.found` is enabled > - > > Key: KAFKA-14435 > URL: https://issues.apache.org/jira/browse/KAFKA-14435 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: Purshotam Chauhan >Priority: Critical > > When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow > everyone only if there is no ACL present for a particular resource. But if > there are ACL present for the resource, then it shouldn't be allowing > everyone. > StandardAuthorizer is allowing the principals for which no ACLs are defined > even when the resource has other ACLs. > > This behavior can be validated with the following test case: > > {code:java} > @Test > public void testAllowEveryoneConfig() throws Exception { > StandardAuthorizer authorizer = new StandardAuthorizer(); > HashMap configs = new HashMap<>(); > configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris"); > configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true"); > authorizer.configure(configs); > authorizer.start(new > AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT))); > authorizer.completeInitialLoad(); > // Allow User:Alice to read topic "foobar" > List acls = asList( > withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", > WILDCARD, READ, ALLOW)) > ); > acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl())); > // User:Bob shouldn't be allowed to read topic "foobar" > assertEquals(singletonList(DENIED), > authorizer.authorize(new MockAuthorizableRequestContext.Builder(). > setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(), > singletonList(newAction(READ, TOPIC, "foobar"; > } > {code} > > In the above test, `User:Bob` should be DENIED but the above test case fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled
[ https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642497#comment-17642497 ] Purshotam Chauhan edited comment on KAFKA-14435 at 12/2/22 1:20 PM: This can be fixed by adding a flag `noResourceAcls` in `MatchingAclBuilder` class. We can set this flag inside the `if` clause [here.|#L523].] was (Author: JIRAUSER298490): This can be fixed by adding a flag `noResourceAcls` in `MatchingAclBuilder` class. We can set this flag inside the if clause [here|[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L523].] > Kraft: StandardAuthorizer allowing a non-authorized user when > `allow.everyone.if.no.acl.found` is enabled > - > > Key: KAFKA-14435 > URL: https://issues.apache.org/jira/browse/KAFKA-14435 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: Purshotam Chauhan >Priority: Critical > > When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow > everyone only if there is no ACL present for a particular resource. But if > there are ACL present for the resource, then it shouldn't be allowing > everyone. > StandardAuthorizer is allowing the principals for which no ACLs are defined > even when the resource has other ACLs. > > This behavior can be validated with the following test case: > > {code:java} > @Test > public void testAllowEveryoneConfig() throws Exception { > StandardAuthorizer authorizer = new StandardAuthorizer(); > HashMap configs = new HashMap<>(); > configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris"); > configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true"); > authorizer.configure(configs); > authorizer.start(new > AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT))); > authorizer.completeInitialLoad(); > // Allow User:Alice to read topic "foobar" > List acls = asList( > withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", > WILDCARD, READ, ALLOW)) > ); > acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl())); > // User:Bob shouldn't be allowed to read topic "foobar" > assertEquals(singletonList(DENIED), > authorizer.authorize(new MockAuthorizableRequestContext.Builder(). > setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(), > singletonList(newAction(READ, TOPIC, "foobar"; > } > {code} > > In the above test, `User:Bob` should be DENIED but the above test case fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled
[ https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642497#comment-17642497 ] Purshotam Chauhan commented on KAFKA-14435: --- This can be fixed by adding a flag `noResourceAcls` in `MatchingAclBuilder` class. We can set this flag inside the if clause [here|[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L523].] > Kraft: StandardAuthorizer allowing a non-authorized user when > `allow.everyone.if.no.acl.found` is enabled > - > > Key: KAFKA-14435 > URL: https://issues.apache.org/jira/browse/KAFKA-14435 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: Purshotam Chauhan >Priority: Critical > > When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow > everyone only if there is no ACL present for a particular resource. But if > there are ACL present for the resource, then it shouldn't be allowing > everyone. > StandardAuthorizer is allowing the principals for which no ACLs are defined > even when the resource has other ACLs. > > This behavior can be validated with the following test case: > > {code:java} > @Test > public void testAllowEveryoneConfig() throws Exception { > StandardAuthorizer authorizer = new StandardAuthorizer(); > HashMap configs = new HashMap<>(); > configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris"); > configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true"); > authorizer.configure(configs); > authorizer.start(new > AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT))); > authorizer.completeInitialLoad(); > // Allow User:Alice to read topic "foobar" > List acls = asList( > withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", > WILDCARD, READ, ALLOW)) > ); > acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl())); > // User:Bob shouldn't be allowed to read topic "foobar" > assertEquals(singletonList(DENIED), > authorizer.authorize(new MockAuthorizableRequestContext.Builder(). > setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(), > singletonList(newAction(READ, TOPIC, "foobar"; > } > {code} > > In the above test, `User:Bob` should be DENIED but the above test case fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled
Purshotam Chauhan created KAFKA-14435: - Summary: Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled Key: KAFKA-14435 URL: https://issues.apache.org/jira/browse/KAFKA-14435 Project: Kafka Issue Type: Bug Components: kraft Reporter: Purshotam Chauhan When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow everyone only if there is no ACL present for a particular resource. But if there are ACL present for the resource, then it shouldn't be allowing everyone. StandardAuthorizer is allowing the principals for which no ACLs are defined even when the resource has other ACLs. This behavior can be validated with the following test case: {code:java} @Test public void testAllowEveryoneConfig() throws Exception { StandardAuthorizer authorizer = new StandardAuthorizer(); HashMap configs = new HashMap<>(); configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris"); configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true"); authorizer.configure(configs); authorizer.start(new AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT))); authorizer.completeInitialLoad(); // Allow User:Alice to read topic "foobar" List acls = asList( withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", WILDCARD, READ, ALLOW)) ); acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl())); // User:Bob shouldn't be allowed to read topic "foobar" assertEquals(singletonList(DENIED), authorizer.authorize(new MockAuthorizableRequestContext.Builder(). setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(), singletonList(newAction(READ, TOPIC, "foobar"; } {code} In the above test, `User:Bob` should be DENIED but the above test case fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14278) Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit
[ https://issues.apache.org/jira/browse/KAFKA-14278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-14278: --- Component/s: producer streams > Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit > --- > > Key: KAFKA-14278 > URL: https://issues.apache.org/jira/browse/KAFKA-14278 > Project: Kafka > Issue Type: Sub-task > Components: producer , streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14278) Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit
[ https://issues.apache.org/jira/browse/KAFKA-14278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14278: -- Assignee: Lucas Brutschy > Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit > --- > > Key: KAFKA-14278 > URL: https://issues.apache.org/jira/browse/KAFKA-14278 > Project: Kafka > Issue Type: Sub-task >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14309) Kafka Streams upgrade tests do not cover for FK-joins
[ https://issues.apache.org/jira/browse/KAFKA-14309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-14309: --- Component/s: streams > Kafka Streams upgrade tests do not cover for FK-joins > - > > Key: KAFKA-14309 > URL: https://issues.apache.org/jira/browse/KAFKA-14309 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > The current streams upgrade system test for FK joins inserts the production > of foreign key data and an actual foreign key join in every version of > SmokeTestDriver except for the latest. The effect was that FK join upgrades > are not tested at all, since no FK join code is executed after the bounce in > the system test. > We should enable the FK-join code in the system test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on pull request #12940: MINOR: Remove lock contention while adding sensors
divijvaidya commented on PR #12940: URL: https://github.com/apache/kafka/pull/12940#issuecomment-1335028608 The build is failing with: ``` [2022-12-01T20:35:07.343Z] FAILURE: Build completed with 2 failures. [2022-12-01T20:35:07.343Z] [2022-12-01T20:35:07.343Z] 1: Task failed with an exception. [2022-12-01T20:35:07.343Z] --- [2022-12-01T20:35:07.343Z] * What went wrong: [2022-12-01T20:35:07.343Z] Execution failed for task ':connect:file:compileJava'. [2022-12-01T20:35:07.343Z] > Compilation failed; see the compiler error output for details. [2022-12-01T20:35:07.343Z] [2022-12-01T20:35:07.343Z] * Try: [2022-12-01T20:35:07.343Z] > Run with --stacktrace option to get the stack trace. [2022-12-01T20:35:07.343Z] > Run with --info or --debug option to get more log output. [2022-12-01T20:35:07.343Z] > Run with --scan to get full insights. [2022-12-01T20:35:07.343Z] == [2022-12-01T20:35:07.343Z] [2022-12-01T20:35:07.343Z] 2: Task failed with an exception. [2022-12-01T20:35:07.343Z] --- [2022-12-01T20:35:07.343Z] * What went wrong: [2022-12-01T20:35:07.343Z] Unable to make progress running work. There are items queued for execution but none of them can be started [2022-12-01T20:35:07.343Z] [2022-12-01T20:35:07.343Z] * Try: [2022-12-01T20:35:07.343Z] > Run with --stacktrace option to get the stack trace. [2022-12-01T20:35:07.343Z] > Run with --info or --debug option to get more log output. [2022-12-01T20:35:07.343Z] > Run with --scan to get full insights. [2022-12-01T20:35:07.343Z] == ``` But strangely it passes locally for me: ``` diviv@147dda742970:~/oss/kafka|minor-lock-contention ⇒ ./gradlew :connect:file:compileJava > Configure project : Starting build with version 3.4.0-SNAPSHOT (commit id 15aa84a7) using Gradle 7.6, Java 17 and Scala 2.13.8 Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0 BUILD SUCCESSFUL in 15s 6 actionable tasks: 5 executed, 1 up-to-date ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12933: MINOR: Optimize metric recording when quota check not required
divijvaidya commented on PR #12933: URL: https://github.com/apache/kafka/pull/12933#issuecomment-1335021913 @mimaison please take a look when you get a chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14374) Kafka streams losing messages in State Store during first launch of app
[ https://issues.apache.org/jira/browse/KAFKA-14374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642374#comment-17642374 ] Youcef Sebiat commented on KAFKA-14374: --- Thanks for the response. 1. {quote}Did you check the content of RocksDB (using interactive queries) to see if data is missing there? If it's missing in RocksDB too, flushing to the changelog should not be the issue, as the data was never put into the store to begin with. {quote} We did not check this yet. We will have to have a look at it. 2. {quote}I also see in the topology visualisation you added to the ticket, that there is two filters before repartitioning – just wondering what they do and if they drop records, how can the content of the input topic be the same as the content of the result topic? {quote} The first filter is to drop tombestones records and the second is to flatMap the new key. In this specific case, there are no tombestones (we relaunched our CDC to make a snapshot of the table in the DB, so everything is in create mode) and the key generator is the identity with casting to int. 3. {quote}Last, after data from the repartition topic was processed downstream, we would issue a "purge" request and delete records. How do you ensure that the repartition topic is really complete? {quote} We launched kafka-console-consumer before launching an app and written the results to a file. We made sure that we have the exact same number of events as the repartition topic. 4. {quote}Are there any deletes happening? Note that the repartition topic is configured with "log retention", while the changelog topic is configured with "log compaction" – thus, the repartition topic could contain two record `` and `` (ie k1 exists physically but is actually logically deleted), while the changelog topic might have been compacted already and the key k1 was purged from it. {quote} There are no deletes happening as we paused CDC connector that feeds the input topic. What concerns us is that we are executing the same exact topology, on the same exact input, and that the state stores are different depending if we are having multi threads or single thread mode. What is more concerning is that it is specifically the partitions of the group leader in the multi-thread that is consistently losing messages. > Kafka streams losing messages in State Store during first launch of app > --- > > Key: KAFKA-14374 > URL: https://issues.apache.org/jira/browse/KAFKA-14374 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0, 3.2.0 >Reporter: Youcef Sebiat >Priority: Major > Attachments: Screenshot 2022-11-09 at 14.56.00.png > > > We have been using Kafka Streams to implement a CDC based app. Attached is > the sub topology of interest. > `table2` topic is created by Debezium who is connected to a SQL DB. It > contains 26K lines. We take `table2` and create a key which is only a > conversion of the key from `string` to `int`. This means that we should > expect that #table2=#repartition-topic=#state-store; which actually is not > verified. What we end up with is the following #table2=#repartition-topic, > but #repartition-topic>#state-store. We actually lose messages and thus > corrupt the state store, which makes the app live in incorrect state. (Please > note that there is no insertion in `table2` as we paused the connector to > verify the cardinality.) > The above happens only during the first launch, i.e. the app has never been > launched before, so internal topics do not exist yet. Restarts of > pre-existing apps do not yield any problems. > We have: > 1. Broker on Kafka 3.2. > 2. Client app on 2.8|3.2 (we tried both and we faced the same issue). > 2. All parameters are by default except `CACHE_MAX_BYTES_BUFFERING_CONFIG` > set to `0` and `NUM_STREAM_THREADS_CONFIG` set to `>1`. > > *What actually worked* > 1. Use a monothread at first launch: using one thread solves the problem. The > #table2=#repartition-topic=#state-store is verified. > 2. Pre-creating kafka internal topics: we noticed that whenever there is > rebalance during the first launch of Kafka Streams app, the state stores > ended up missing values. This also happens when you launch multiple pods in > K8s for example. When we read through the logs, we noticed that there is a > rebalance that is triggered when we first launch the app. This comes from the > fact that the internal topics get created and assigned, thus the rebalance. > So by creating the internal topics before, we avoid the rebalance and we end > up by #table2=#repartition-topic=#state-store. > *What we noticed from the logs* > On multi-thread mode, we noticed that it is the partitions that are assigned > to the thread
[jira] [Resolved] (KAFKA-14400) KStream - KStream - LeftJoin() does not call ValueJoiner with null value
[ https://issues.apache.org/jira/browse/KAFKA-14400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor van den Hoven resolved KAFKA-14400. -- Resolution: Not A Bug It behaves differently because stream-stram-leftjoin semantics have changed. > KStream - KStream - LeftJoin() does not call ValueJoiner with null value > - > > Key: KAFKA-14400 > URL: https://issues.apache.org/jira/browse/KAFKA-14400 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.1, 3.3.1 > Environment: Windows PC >Reporter: Victor van den Hoven >Priority: Major > Attachments: Afbeelding 2.png, SimpleStreamTopology.java, > SimpleStreamTopologyTest.java > > > In Kafka-streams 3.1.1 : > When using +JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(1))+ > the KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, > JoinWindows windows) does not seem to call the _joiner_ with null value when > join predicate is not satisfied (not expected). > > When using deprecated +JoinWindows.of(Duration.ofMillis(1));+ > the KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, > JoinWindows windows) does > call the _joiner_ with null value when join predicate is not satisfied (as > expected and documented). > > Attached you can find two files with TopologyTestDriver Unit test to > reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14400) KStream - KStream - LeftJoin() does not call ValueJoiner with null value
[ https://issues.apache.org/jira/browse/KAFKA-14400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642366#comment-17642366 ] Victor van den Hoven commented on KAFKA-14400: -- Thank you for answering my questions. :) > KStream - KStream - LeftJoin() does not call ValueJoiner with null value > - > > Key: KAFKA-14400 > URL: https://issues.apache.org/jira/browse/KAFKA-14400 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.1, 3.3.1 > Environment: Windows PC >Reporter: Victor van den Hoven >Priority: Major > Attachments: Afbeelding 2.png, SimpleStreamTopology.java, > SimpleStreamTopologyTest.java > > > In Kafka-streams 3.1.1 : > When using +JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(1))+ > the KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, > JoinWindows windows) does not seem to call the _joiner_ with null value when > join predicate is not satisfied (not expected). > > When using deprecated +JoinWindows.of(Duration.ofMillis(1));+ > the KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, > JoinWindows windows) does > call the _joiner_ with null value when join predicate is not satisfied (as > expected and documented). > > Attached you can find two files with TopologyTestDriver Unit test to > reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14400) KStream - KStream - LeftJoin() does not call ValueJoiner with null value
[ https://issues.apache.org/jira/browse/KAFKA-14400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642353#comment-17642353 ] Matthias J. Sax commented on KAFKA-14400: - {quote}Am I correct that when one +LAST+ input record is written to the left-input stream not within any window and when no more input records are written to any of the input streams, then this LAST input record will never be emitted? {quote} That correct. In general, this is not an issue though, as in a stream processing world, there is no such thing as a "last input record". It can be annoying when writing tests, but for this case, you can always add a "dummy" record as very last record to flush out all records you actually want to test. {quote}In the kstream-kstream-join table of the "Improved Left/Outer Stream-Stream Join" on the wiki page that was referred to: Why is "d,ts=14" not also joined with "A,ts=3" as the window size is 15? And why is "D,ts=15" not also joined with "A,ts=3"? {quote} Seems to be a c error – good catch! I fixed it. > KStream - KStream - LeftJoin() does not call ValueJoiner with null value > - > > Key: KAFKA-14400 > URL: https://issues.apache.org/jira/browse/KAFKA-14400 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.1, 3.3.1 > Environment: Windows PC >Reporter: Victor van den Hoven >Priority: Major > Attachments: Afbeelding 2.png, SimpleStreamTopology.java, > SimpleStreamTopologyTest.java > > > In Kafka-streams 3.1.1 : > When using +JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(1))+ > the KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, > JoinWindows windows) does not seem to call the _joiner_ with null value when > join predicate is not satisfied (not expected). > > When using deprecated +JoinWindows.of(Duration.ofMillis(1));+ > the KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, > JoinWindows windows) does > call the _joiner_ with null value when join predicate is not satisfied (as > expected and documented). > > Attached you can find two files with TopologyTestDriver Unit test to > reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe merged pull request #12837: MINOR: extract jointly owned parts of BrokerServer and ControllerServer
cmccabe merged PR #12837: URL: https://github.com/apache/kafka/pull/12837 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14184) Kafka streams application crashes due to "UnsupportedOperationException: this should not happen: timestamp() is not supported in standby tasks."
[ https://issues.apache.org/jira/browse/KAFKA-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642351#comment-17642351 ] Matthias J. Sax commented on KAFKA-14184: - Thanks for the update. – It's hard to say what might have change between 2.7 and 3.2 – it's a lot of versions in between. Given that it's very unlikely that we would have a bug-fix release for older versions anyway, it might not be worth to spend time to figure it out, given that it seems to work as expected in 3.2.2. > Kafka streams application crashes due to "UnsupportedOperationException: this > should not happen: timestamp() is not supported in standby tasks." > > > Key: KAFKA-14184 > URL: https://issues.apache.org/jira/browse/KAFKA-14184 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: Suresh Rukmangathan >Priority: Critical > > Kafka streams application is crashing with following stack trace with 3 > frames from the app removed that are process/state-store related functions. > > {code:java} > java.lang.UnsupportedOperationException: this should not happen: timestamp() > is not supported in standby tasks.\n\n\tat > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.throwUnsupportedOperationExceptionIfStandby(ProcessorContextImpl.java:352)\n\n\tat > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.timestamp(ProcessorContextImpl.java:328)\n\n\tat > > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.log(ChangeLoggingKeyValueBytesStore.java:136)\n\n\tat > > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:78)\n\n\tat > > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:32)\n\n\tat > > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:197)\n\n\tat > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat > > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:197)\n\n\tat > > org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120)\n\n\tat > // app-calls to process & save to state store - 3 frames > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\n\tat > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\n\tat > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)\n\n\tat > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)\n\n\tat > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\n\tat > > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\n\tat > > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)\n\n\tat > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat > > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)\n\n\tat > > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)\n\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)\n\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)\n\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)\n" > {code} > > Key Kafka streams application configuration details are as below:- > {code:java} > {replication.factor=1, num.standby.replicas=1, topology.optimization=all, > max.request.size=1048576, auto.offset.reset=earliest}{code} > > If Kafka streams replication factor = 1 and standby replicas=1, is that an > issue? Do we expect that the replication factor should be at least n+1, if > standby replicas=1 (or) there is no relationship? > > Couple of more data points are:- > # Crash stopped once I made the standby replicas to 0. > # Crash also stopped once I made the number of instances (only one pod - one > pod has only one instance of the application running) > > So, is there something that is