[GitHub] [kafka] ewencp commented on a change in pull request #10915: Enable connecting VS Code remote debugger
ewencp commented on a change in pull request #10915: URL: https://github.com/apache/kafka/pull/10915#discussion_r657638116 ## File path: tests/README.md ## @@ -51,6 +51,40 @@ bash tests/docker/ducker-ak up -j 'openjdk:11'; tests/docker/run_tests.sh ``` REBUILD="t" bash tests/docker/run_tests.sh ``` +* Debug tests in VS Code: + - Run test with `--debug` flag (can be before or after file name): Review comment: As in other thread, I'm fine either way. I prefer just getting things to a clean, consistent state, especially since these are tests so we don't really have the same compatibility requirements. But if that creates too much overhead/confusion (especially since we need to continue running tests on older branches), this alternative approach seems acceptable even if a bit unusual. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stan-confluent commented on a change in pull request #10915: Enable connecting VS Code remote debugger
stan-confluent commented on a change in pull request #10915: URL: https://github.com/apache/kafka/pull/10915#discussion_r657636156 ## File path: tests/README.md ## @@ -51,6 +51,40 @@ bash tests/docker/ducker-ak up -j 'openjdk:11'; tests/docker/run_tests.sh ``` REBUILD="t" bash tests/docker/run_tests.sh ``` +* Debug tests in VS Code: + - Run test with `--debug` flag (can be before or after file name): Review comment: We can still keep this - I do need to update run_tests.sh to pass _DUCKTAPE_OPTIONS after the `--`. My idea was that I didn't want to make sure the flags don't overlap between ducktape and ducker-ak, hence I changed ducker-ak to expect ducktape args after the `--` - like this `ducker-ak test my_test.py -- --ducktape-flag`. I am, however, open to simply using a different flag name - it is unlikely we'll add too many flags to the ducker-ak run command anyway, so we can simply make sure they don't match the ducktape ones. @omkreddy and @ewencp (with whom we had similar conversation in a different repo) - which one do you prefer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time
skaundinya15 commented on a change in pull request #10743: URL: https://github.com/apache/kafka/pull/10743#discussion_r657611619 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java ## @@ -29,9 +32,9 @@ */ @InterfaceStability.Evolving public class DeleteConsumerGroupsResult { -private final Map> futures; +private final Map> futures; -DeleteConsumerGroupsResult(final Map> futures) { +DeleteConsumerGroupsResult(Map> futures) { Review comment: @tombentley I see, thank you for the clarification. I guess in that case we have to stick with returning the `KafkaFutureImpl` in this situation then? Doesn't really seem to be a way around this otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12987) kafka用户无密码防暴力破解功能
[ https://issues.apache.org/jira/browse/KAFKA-12987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368600#comment-17368600 ] Luke Chen commented on KAFKA-12987: --- [~ohye3166], thanks for your report. Could you translate into English? Also, could you add reproduce steps or what area you mentioned that the attacker can brute-force password guessing? Thank you. > kafka用户无密码防暴力破解功能 > - > > Key: KAFKA-12987 > URL: https://issues.apache.org/jira/browse/KAFKA-12987 > Project: Kafka > Issue Type: Improvement > Components: admin >Affects Versions: 2.7.1 >Reporter: chenzongyi >Priority: Major > > 可多次用错误的密码进行访问,没有防暴力破解的 功能 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on pull request #10923: KAFKA-12976: Remove UNSUPPORTED_VERSION error from delete and describe topics calls
jolshan commented on pull request #10923: URL: https://github.com/apache/kafka/pull/10923#issuecomment-867292878 I don't think we need to support unsupported version errors in the client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12987) kafka用户无密码防暴力破解功能
chenzongyi created KAFKA-12987: -- Summary: kafka用户无密码防暴力破解功能 Key: KAFKA-12987 URL: https://issues.apache.org/jira/browse/KAFKA-12987 Project: Kafka Issue Type: Improvement Components: admin Affects Versions: 2.7.1 Reporter: chenzongyi 可多次用错误的密码进行访问,没有防暴力破解的 功能 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10900: KAFKA-12967; KRaft broker should forward DescribeQuorum to controller
hachikuji commented on a change in pull request #10900: URL: https://github.com/apache/kafka/pull/10900#discussion_r657576935 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -217,6 +227,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request) case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request) case ApiKeys.ALLOCATE_PRODUCER_IDS => maybeForwardToController(request, handleAllocateProducerIdsRequest) +case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request) Review comment: Yeah, because DescribeQuorum does not make sense for zk clusters. As far as I know, this is the only case of a client api which is only exposed under kraft. However, we do have some internal controller apis that are only used by kraft (e.g. for broker registration and heartbeating). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] g1geordie commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
g1geordie commented on pull request #10190: URL: https://github.com/apache/kafka/pull/10190#issuecomment-867277063 @bbejeck Failed test also appear in other issues. I think it's unrelated. ``` org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic() kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions() ``` Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
ccding commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r657558091 ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -167,21 +245,14 @@ object LogLoader extends Logging { * in place of existing segment(s). For log splitting, we know that any .swap file whose base offset is higher than * the smallest offset .clean file could be part of an incomplete split operation. Such .swap files are also deleted * by this method. + * * @param params The parameters for the log being loaded from disk - * @return Set of .swap files that are valid to be swapped in as segment files + * @return Set of .swap files that are valid to be swapped in as segment files and index files Review comment: No, we are not renaming .cleaned files to .swap files due to KAFKA-6264. I forgot to update the description of the PR. Just updated it: please see the updated one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
ccding commented on pull request #10763: URL: https://github.com/apache/kafka/pull/10763#issuecomment-867273158 @junrao Thanks for the review. Addressed your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time
tombentley commented on a change in pull request #10743: URL: https://github.com/apache/kafka/pull/10743#discussion_r657570794 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java ## @@ -29,9 +32,9 @@ */ @InterfaceStability.Evolving public class DeleteConsumerGroupsResult { -private final Map> futures; +private final Map> futures; -DeleteConsumerGroupsResult(final Map> futures) { +DeleteConsumerGroupsResult(Map> futures) { Review comment: @skaundinya15 those methods in `KafkaFuture` are intentionally not `public` because a user receiving an instance should _never_ need to complete the future (they're always completed by the admin client). Making them `public` would thus make the API less type safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API
ableegoldman commented on a change in pull request #10840: URL: https://github.com/apache/kafka/pull/10840#discussion_r657560587 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1558,12 +1607,45 @@ private void processStreamThread(final Consumer consumer) { for (final StreamThread thread : copy) consumer.accept(thread); } +/** + * Returns runtime information about the local threads of this {@link KafkaStreams} instance. + * + * @return the set of {@link org.apache.kafka.streams.processor.ThreadMetadata}. + * @deprecated since 3.0 use {@link #threadsMetadata()} + */ +@Deprecated +@SuppressWarnings("deprecation") +public Set localThreadsMetadata() { +return threadsMetadata().stream().map(threadMetadata -> new org.apache.kafka.streams.processor.ThreadMetadata( +threadMetadata.threadName(), +threadMetadata.threadState(), +threadMetadata.consumerClientId(), +threadMetadata.restoreConsumerClientId(), +threadMetadata.producerClientIds(), +threadMetadata.adminClientId(), +threadMetadata.activeTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata( +taskMetadata.taskId().toString(), +taskMetadata.topicPartitions(), +taskMetadata.committedOffsets(), +taskMetadata.endOffsets(), +taskMetadata.timeCurrentIdlingStarted()) +).collect(Collectors.toSet()), +threadMetadata.standbyTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata( +taskMetadata.taskId().toString(), +taskMetadata.topicPartitions(), +taskMetadata.committedOffsets(), +taskMetadata.endOffsets(), +taskMetadata.timeCurrentIdlingStarted()) +).collect(Collectors.toSet( +.collect(Collectors.toSet()); +} + /** * Returns runtime information about the local threads of this {@link KafkaStreams} instance. * * @return the set of {@link ThreadMetadata}. */ -public Set localThreadsMetadata() { +public Set threadsMetadata() { Review comment: @cadonna `localThreadMetadata` still sounds more correct to me than `localThreadsMetadata`. I really can't explain it other than to say that English is weird, and names/titles like this do not always follow the regular rules of grammar/plurals 路♀️ But actually I like your suggestion `metadataForLocalThreads()` even better than any of them, SGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
ccding commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r657560081 ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -106,7 +174,17 @@ object LogLoader extends Logging { loadSegmentFiles(params) }) -completeSwapOperations(swapFiles, params) +// Do the actual recovery for toRecoverSwapFiles, as discussed above. Review comment: you are right if we don't need to do sanity checks. Removed this ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -90,11 +90,79 @@ object LogLoader extends Logging { * overflow index offset */ def load(params: LoadLogParams): LoadedLogOffsets = { -// first do a pass through the files in the log directory and remove any temporary files + +// First pass: through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles(params) -// Now do a second pass and load all the log and index files. +// The remaining valid swap files must come from compaction operation. We can simply rename them +// to regular segment files. But, before renaming, we should figure out which segments are +// compacted and delete these segment files: this is done by calculating min/maxSwapFileOffset. +// If sanity check fails, we cannot do the simple renaming, we must do a full recovery, which +// involves rebuilding all the index files and the producer state. +// We store segments that require renaming and recovery in this code block, and do the actual +// renaming and recovery later. +var minSwapFileOffset = Long.MaxValue +var maxSwapFileOffset = Long.MinValue +val toRenameSwapFiles = mutable.Set[File]() +val toRecoverSwapFiles = mutable.Set[File]() +swapFiles.filter(f => Log.isLogFile(new File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "".foreach { f => + val baseOffset = offsetFromFile(f) + val segment = LogSegment.open(f.getParentFile, +baseOffset = baseOffset, +params.config, +time = params.time, +fileSuffix = Log.SwapFileSuffix) + try { +segment.sanityCheck(false) Review comment: fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
ccding commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r657559556 ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -90,11 +90,79 @@ object LogLoader extends Logging { * overflow index offset */ def load(params: LoadLogParams): LoadedLogOffsets = { -// first do a pass through the files in the log directory and remove any temporary files + +// First pass: through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles(params) -// Now do a second pass and load all the log and index files. +// The remaining valid swap files must come from compaction operation. We can simply rename them Review comment: are you concerned about the logic or the comment? If comment only, I fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
ccding commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r657558091 ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -167,21 +245,14 @@ object LogLoader extends Logging { * in place of existing segment(s). For log splitting, we know that any .swap file whose base offset is higher than * the smallest offset .clean file could be part of an incomplete split operation. Such .swap files are also deleted * by this method. + * * @param params The parameters for the log being loaded from disk - * @return Set of .swap files that are valid to be swapped in as segment files + * @return Set of .swap files that are valid to be swapped in as segment files and index files Review comment: No, we are not renaming .cleaned files to .swap files due to KAFKA-6264. I forgot to update the description of the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
ccding commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r657558091 ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -167,21 +245,14 @@ object LogLoader extends Logging { * in place of existing segment(s). For log splitting, we know that any .swap file whose base offset is higher than * the smallest offset .clean file could be part of an incomplete split operation. Such .swap files are also deleted * by this method. + * * @param params The parameters for the log being loaded from disk - * @return Set of .swap files that are valid to be swapped in as segment files + * @return Set of .swap files that are valid to be swapped in as segment files and index files Review comment: No, we are not renaming .cleaned files to .swap files. I forgot to update the description of the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata
[ https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368542#comment-17368542 ] A. Sophie Blee-Goldman commented on KAFKA-12984: [~mjsax] Technically yes, the issue with the SubscriptionState potentially providing invalid "ownedPartitions" input can affect Kafka Streams as well. However the impact for Streams is be considerably less severe, as the assignment algorithm it doesn't make any assumptions about the previous assignment being valid. The worst that should happen to a Streams application is that the assignment could be slightly sub-optimal, with a partition/active task being assigned to a member that had dropped out of the group since being assigned that partition, instead of its true current owner. > Cooperative sticky assignor can get stuck with invalid SubscriptionState > input metadata > --- > > Key: KAFKA-12984 > URL: https://issues.apache.org/jira/browse/KAFKA-12984 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 3.0.0, 2.8.1 > > > Some users have reported seeing their consumer group become stuck in the > CompletingRebalance phase when using the cooperative-sticky assignor. Based > on the request metadata we were able to deduce that multiple consumers were > reporting the same partition(s) in their "ownedPartitions" field of the > consumer protocol. Since this is an invalid state, the input causes the > cooperative-sticky assignor to detect that something is wrong and throw an > IllegalStateException. If the consumer application is set up to simply retry, > this will cause the group to appear to hang in the rebalance state. > The "ownedPartitions" field is encoded based on the ConsumerCoordinator's > SubscriptionState, which was assumed to always be up to date. However there > may be cases where the consumer has dropped out of the group but fails to > clear the SubscriptionState, allowing it to report some partitions as owned > that have since been reassigned to another member. > We should (a) fix the sticky assignment algorithm to resolve cases of > improper input conditions by invalidating the "ownedPartitions" in cases of > double ownership, and (b) shore up the ConsumerCoordinator logic to better > handle rejoining the group and keeping its internal state consistent. See > KAFKA-12983 for more details on (b) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r657531976 ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -1352,8 +1370,7 @@ class ReplicaManagerTest { Optional.of(1)) val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 10) assertNull(fetchResult.get) - -Mockito.when(replicaManager.metadataCache.contains(tp0)).thenReturn(true) + Mockito.when(replicaManager.metadataCache.contains(ArgumentMatchers.eq(tp0))).thenReturn(true) Review comment: your proposed change doesn't compile because metadataCache is not defined in this scope -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r657531588 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -62,17 +46,22 @@ trait MetadataCache { def getAllTopics(): collection.Set[String] - def getAllPartitions(): collection.Set[TopicPartition] + def getTopicPartitions(topicName: String): collection.Set[TopicPartition] - def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + def hasAliveBroker(brokerId: Int): Boolean - def getAliveBroker(brokerId: Int): Option[MetadataBroker] + def getAliveBrokers(): Iterable[BrokerMetadata] - def getAliveBrokers: collection.Seq[MetadataBroker] + def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node] + + def getAliveBrokerNodes(listenerName: String): Iterable[Node] def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] - def numPartitions(topic: String): Option[Int] + /** + * Return the number of partitions in the given topic, or 0 if the given topic does not exist. + */ + def numPartitions(topic: String): Int Review comment: It just seems awkward because typically when you ask "how many partitions?" the answer is not Some or None, but a number. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r657530812 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -2886,7 +2885,7 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseCallback(error)(partitionErrors) } else { val partitions = if (electionRequest.data.topicPartitions == null) { -metadataCache.getAllPartitions() + metadataCache.getAllTopics().flatMap(metadataCache.getTopicPartitions(_)) Review comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r657530674 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1164,7 +1164,7 @@ class KafkaApis(val requestChannel: RequestChannel, var unauthorizedForCreateTopics = Set[String]() if (authorizedTopics.nonEmpty) { - val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) + val nonExistingTopics = authorizedTopics.filter(!metadataCache.contains(_)) Review comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh
IgnacioAcunaF commented on pull request #10858: URL: https://github.com/apache/kafka/pull/10858#issuecomment-867206422 Hi @dajac, thanks again for your comments. I've updated the PR with them in consideration. What do you think? best regards -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
junrao commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r657507051 ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -106,7 +174,17 @@ object LogLoader extends Logging { loadSegmentFiles(params) }) -completeSwapOperations(swapFiles, params) +// Do the actual recovery for toRecoverSwapFiles, as discussed above. Review comment: Hmm, I am not sure why we need this step. We have processed all .swap files before and no new .swap files should be introduced if we get to here. ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -167,21 +245,14 @@ object LogLoader extends Logging { * in place of existing segment(s). For log splitting, we know that any .swap file whose base offset is higher than * the smallest offset .clean file could be part of an incomplete split operation. Such .swap files are also deleted * by this method. + * * @param params The parameters for the log being loaded from disk - * @return Set of .swap files that are valid to be swapped in as segment files + * @return Set of .swap files that are valid to be swapped in as segment files and index files Review comment: The PR descriptions says "as a result, if at least one .swap file exists for a segment, all other files for the segment must exist as .cleaned files or .swap files. Therefore, we rename the .cleaned files to .swap files, then make them normal segment files.". Are we implementing the renaming of .clean files to .swap files? ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -90,11 +90,79 @@ object LogLoader extends Logging { * overflow index offset */ def load(params: LoadLogParams): LoadedLogOffsets = { -// first do a pass through the files in the log directory and remove any temporary files + +// First pass: through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles(params) -// Now do a second pass and load all the log and index files. +// The remaining valid swap files must come from compaction operation. We can simply rename them +// to regular segment files. But, before renaming, we should figure out which segments are +// compacted and delete these segment files: this is done by calculating min/maxSwapFileOffset. +// If sanity check fails, we cannot do the simple renaming, we must do a full recovery, which +// involves rebuilding all the index files and the producer state. +// We store segments that require renaming and recovery in this code block, and do the actual +// renaming and recovery later. +var minSwapFileOffset = Long.MaxValue +var maxSwapFileOffset = Long.MinValue +val toRenameSwapFiles = mutable.Set[File]() +val toRecoverSwapFiles = mutable.Set[File]() +swapFiles.filter(f => Log.isLogFile(new File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "".foreach { f => + val baseOffset = offsetFromFile(f) + val segment = LogSegment.open(f.getParentFile, +baseOffset = baseOffset, +params.config, +time = params.time, +fileSuffix = Log.SwapFileSuffix) + try { +segment.sanityCheck(false) Review comment: It doesn't seem we need this since we call segment.sanityCheck() on all segments later in loadSegmentFiles(). ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -90,11 +90,79 @@ object LogLoader extends Logging { * overflow index offset */ def load(params: LoadLogParams): LoadedLogOffsets = { -// first do a pass through the files in the log directory and remove any temporary files + +// First pass: through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles(params) -// Now do a second pass and load all the log and index files. +// The remaining valid swap files must come from compaction operation. We can simply rename them Review comment: It seems that those swap files could be the result of segment split too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.
IgnacioAcunaF commented on a change in pull request #10858: URL: https://github.com/apache/kafka/pull/10858#discussion_r657507129 ## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ## @@ -62,6 +62,92 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { +val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") +val groupService = consumerGroupService(args) + +val testTopicPartition0 = new TopicPartition("testTopic1", 0); +val testTopicPartition1 = new TopicPartition("testTopic1", 1); +val testTopicPartition2 = new TopicPartition("testTopic1", 2); +val testTopicPartition3 = new TopicPartition("testTopic2", 0); +val testTopicPartition4 = new TopicPartition("testTopic2", 1); +val testTopicPartition5 = new TopicPartition("testTopic2", 2); + +val offsets = Map( + //testTopicPartition0 -> there is no offset information for an asssigned topic partition + testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition + testTopicPartition2 -> null, //there is a null value for an asssigned topic partition + // testTopicPartition3 -> there is no offset information for an unasssigned topic partition + testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition + testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition +).asJava + +val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) +val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), +) +val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 ) +val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet + +def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = { + val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava)) + val description = new ConsumerGroupDescription(group, +true, +Collections.singleton(member1), +classOf[RangeAssignor].getName, +groupState, +new Node(1, "localhost", 9092)) + new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description))) +} + +def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] = { + val expectedOffsets = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map => +map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec]) + } +} +def offsetsArgMatcherUnassignedTopics: util.Map[TopicPartition, OffsetSpec] = { + val expectedOffsets = offsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map => +map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec]) + } +} + when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())) + .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE)) +when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())) + .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets)) +doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherAssignedTopics, any()) +doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherUnassignedTopics, any()) + +val (state, assignments) = groupService.collectGroupOffsets(group) +assertEquals(Some("Stable"), state) +assertTrue(assignments.nonEmpty) +// Results should have information for all assigned topic partition (even if there is not Offset's information at all, because they get fills with None) +// Results should have information only for unassigned topic partitions if and only if there is information about
[GitHub] [kafka] izzyacademy commented on pull request #10740: Kafka 8613 kip 633 drop default grace period streams
izzyacademy commented on pull request #10740: URL: https://github.com/apache/kafka/pull/10740#issuecomment-867189274 I rebased and created a brand new branch with the new changes. A new PR has been created https://github.com/apache/kafka/pull/10924/ Please take a look when you have a moment. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] izzyacademy commented on pull request #10924: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations
izzyacademy commented on pull request #10924: URL: https://github.com/apache/kafka/pull/10924#issuecomment-867188534 @ableegoldman @mjsax @showuon @cadonna When you have a moment, please take a look. This is an update based on the feedback from PR #10740 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] izzyacademy opened a new pull request #10924: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations
izzyacademy opened a new pull request #10924: URL: https://github.com/apache/kafka/pull/10924 KIP-633 New APIs for Controlling Grace Period for Windowed Operations - Added API changes by KIP-633 for JoinWindows, SessionWindows, TimeWindows and SlidingWindows - Renamed Windows.DEFAULT_GRACE_PERIOD_MS to DEPRECATED_OLD_24_HR_GRACE_PERIOD - Added new constant Windows.NO_GRACE_PERIOD to avoid magic constants when 0 is specified as grace Period - Added preliminary Java unit test cases for new API methods - Replaced Deprecated calls with equivalent in Examples - Replaced Deprecated API calls in Scala tests with updated API method calls - Added Deprecation suppression in Tests for derecated API method calls in Java and Scala Tests modified: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java modified: streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java modified: streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java modified: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java modified: streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java modified: streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java modified: streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java modified: streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java modified: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala modified: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala modified: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala modified: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala modified: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java modified: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java modified: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java modified: streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java modified: streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java modified: streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java modified: streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java modified: streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java modified: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java modified: streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java modified: streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java modified: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java modified:
[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.
IgnacioAcunaF commented on a change in pull request #10858: URL: https://github.com/apache/kafka/pull/10858#discussion_r657483179 ## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ## @@ -62,6 +62,92 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { +val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") +val groupService = consumerGroupService(args) + +val testTopicPartition0 = new TopicPartition("testTopic1", 0); +val testTopicPartition1 = new TopicPartition("testTopic1", 1); +val testTopicPartition2 = new TopicPartition("testTopic1", 2); +val testTopicPartition3 = new TopicPartition("testTopic2", 0); +val testTopicPartition4 = new TopicPartition("testTopic2", 1); +val testTopicPartition5 = new TopicPartition("testTopic2", 2); + +val offsets = Map( + //testTopicPartition0 -> there is no offset information for an asssigned topic partition + testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition + testTopicPartition2 -> null, //there is a null value for an asssigned topic partition + // testTopicPartition3 -> there is no offset information for an unasssigned topic partition + testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition + testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition +).asJava + +val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) +val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), +) +val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 ) +val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet + +def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = { + val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava)) + val description = new ConsumerGroupDescription(group, +true, +Collections.singleton(member1), +classOf[RangeAssignor].getName, +groupState, +new Node(1, "localhost", 9092)) + new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description))) +} + +def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] = { + val expectedOffsets = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map => +map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec]) + } +} Review comment: Yes, that's a good idea. Going to unifiy both cases on a single argument matcher -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.
IgnacioAcunaF commented on a change in pull request #10858: URL: https://github.com/apache/kafka/pull/10858#discussion_r657481603 ## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ## @@ -62,6 +62,92 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { +val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") +val groupService = consumerGroupService(args) + +val testTopicPartition0 = new TopicPartition("testTopic1", 0); +val testTopicPartition1 = new TopicPartition("testTopic1", 1); +val testTopicPartition2 = new TopicPartition("testTopic1", 2); +val testTopicPartition3 = new TopicPartition("testTopic2", 0); +val testTopicPartition4 = new TopicPartition("testTopic2", 1); +val testTopicPartition5 = new TopicPartition("testTopic2", 2); + +val offsets = Map( + //testTopicPartition0 -> there is no offset information for an asssigned topic partition + testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition + testTopicPartition2 -> null, //there is a null value for an asssigned topic partition + // testTopicPartition3 -> there is no offset information for an unasssigned topic partition + testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition + testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition +).asJava + +val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) +val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), +) +val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 ) +val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet + +def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = { + val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava)) + val description = new ConsumerGroupDescription(group, +true, +Collections.singleton(member1), +classOf[RangeAssignor].getName, +groupState, +new Node(1, "localhost", 9092)) + new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description))) +} + +def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] = { + val expectedOffsets = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map => +map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec]) + } +} +def offsetsArgMatcherUnassignedTopics: util.Map[TopicPartition, OffsetSpec] = { + val expectedOffsets = offsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map => +map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec]) + } +} + when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())) + .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE)) +when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())) + .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets)) +doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherAssignedTopics, any()) +doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherUnassignedTopics, any()) Review comment: In fact that was my first approach, but defining a when(...).thenReturn(...) for two differentes ArgMatcher for the same method throw an Null Pointer Error at Mockito. Used the doAnswer as a workaround. But taking into account tha previous comment, I am working on a single when(...).thenReturn(...), so it wouldn't be a problem anymore -- This is an automated message from the Apache Git
[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.
IgnacioAcunaF commented on a change in pull request #10858: URL: https://github.com/apache/kafka/pull/10858#discussion_r657479764 ## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ## @@ -62,6 +62,92 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { +val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") +val groupService = consumerGroupService(args) + +val testTopicPartition0 = new TopicPartition("testTopic1", 0); +val testTopicPartition1 = new TopicPartition("testTopic1", 1); +val testTopicPartition2 = new TopicPartition("testTopic1", 2); +val testTopicPartition3 = new TopicPartition("testTopic2", 0); +val testTopicPartition4 = new TopicPartition("testTopic2", 1); +val testTopicPartition5 = new TopicPartition("testTopic2", 2); + +val offsets = Map( + //testTopicPartition0 -> there is no offset information for an asssigned topic partition + testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition + testTopicPartition2 -> null, //there is a null value for an asssigned topic partition + // testTopicPartition3 -> there is no offset information for an unasssigned topic partition + testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition + testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition +).asJava + +val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) +val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), +) +val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 ) +val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet Review comment: You are absolutely right, miss testTopicPartition3. Going to set it explicitly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time
skaundinya15 commented on a change in pull request #10743: URL: https://github.com/apache/kafka/pull/10743#discussion_r657473433 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java ## @@ -29,9 +32,9 @@ */ @InterfaceStability.Evolving public class DeleteConsumerGroupsResult { -private final Map> futures; +private final Map> futures; -DeleteConsumerGroupsResult(final Map> futures) { +DeleteConsumerGroupsResult(Map> futures) { Review comment: @dajac @mimaison It looks like the reason we have to do `KafkaFutureImpl` here is because in the `SimpleAdminApiFuture` class, we have a class variable called `futures` which is of type ` private final Map>`. As a result, we return `KafkaFutureImpl` for the `all()`, `get()` and other methods. We could change all of this to use `KafkaFuture` instead, but this would require us to change the following methods from `protected` to `public`: ```java /** * If not already completed, sets the value returned by get() and related methods to the given * value. */ protected abstract boolean complete(T newValue); /** * If not already completed, causes invocations of get() and related methods to throw the given * exception. */ protected abstract boolean completeExceptionally(Throwable newException); ``` The question here would be should we change these `protected` methods to be `public` so we can maintain just returning a type of `KafkaFuture` or if it's okay to return `KafkaFutureImpl`. I think it could be worth changing `KafkaFuture` methods from `protected` to `public` so that we can maintain the invariant of always returning the type of `KafkaFuture`, but not sure if that would require getting some consensus on the mailing list about this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on a change in pull request #10915: Enable connecting VS Code remote debugger
omkreddy commented on a change in pull request #10915: URL: https://github.com/apache/kafka/pull/10915#discussion_r657466048 ## File path: tests/README.md ## @@ -51,6 +51,40 @@ bash tests/docker/ducker-ak up -j 'openjdk:11'; tests/docker/run_tests.sh ``` REBUILD="t" bash tests/docker/run_tests.sh ``` +* Debug tests in VS Code: + - Run test with `--debug` flag (can be before or after file name): Review comment: looks like this conflicts with existing usage running tests with debug logs (check above) `_DUCKTAPE_OPTIONS="--debug" bash tests/docker/run_tests.sh | tee debug_logs.txt` Can we use different option to enable debugger ex: --debugpy (or) --enable-debugger -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12559) Add a top-level Streams config for bounding off-heap memory
[ https://issues.apache.org/jira/browse/KAFKA-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368464#comment-17368464 ] Martin Sundeqvist commented on KAFKA-12559: --- [~ableegoldman] [~simplyamuthan] Hi, I can see it's been a while since the last update. I'm new at this, and would like to take a crack at it, unless things are already moving forward? > Add a top-level Streams config for bounding off-heap memory > --- > > Key: KAFKA-12559 > URL: https://issues.apache.org/jira/browse/KAFKA-12559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: amuthan Ganeshan >Priority: Major > Labels: needs-kip, newbie, newbie++ > > At the moment we provide an example of how to bound the memory usage of > rocskdb in the [Memory > Management|https://kafka.apache.org/27/documentation/streams/developer-guide/memory-mgmt.html#rocksdb] > section of the docs. This requires implementing a custom RocksDBConfigSetter > class and setting a number of rocksdb options for relatively advanced > concepts and configurations. It seems a fair number of users either fail to > find this or consider it to be for more advanced use cases/users. But RocksDB > can eat up a lot of off-heap memory and it's not uncommon for users to come > across a {{RocksDBException: Cannot allocate memory}} > It would probably be a much better user experience if we implemented this > memory bound out-of-the-box and just gave users a top-level StreamsConfig to > tune the off-heap memory given to rocksdb, like we have for on-heap cache > memory with cache.max.bytes.buffering. More advanced users can continue to > fine-tune their memory bounding and apply other configs with a custom config > setter, while new or more casual users can cap on the off-heap memory without > getting their hands dirty with rocksdb. > I would propose to add the following top-level config: > rocksdb.max.bytes.off.heap: medium priority, default to -1 (unbounded), valid > values are [0, inf] > I'd also want to consider adding a second, lower priority top-level config to > give users a knob for adjusting how much of that total off-heap memory goes > to the block cache + index/filter blocks, and how much of it is afforded to > the write buffers. I'm struggling to come up with a good name for this > config, but it would be something like > rocksdb.memtable.to.block.cache.off.heap.memory.ratio: low priority, default > to 0.5, valid values are [0, 1] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] izzyacademy commented on pull request #10740: Kafka 8613 kip 633 drop default grace period streams
izzyacademy commented on pull request #10740: URL: https://github.com/apache/kafka/pull/10740#issuecomment-867140579 > Hey @izzyacademy , I'm taking a look but just a quick heads up: the build failed to compile, looks like the problem is some of the demo classes use one of the now-deprecated methods and need to be migrated to the new API: `TemperatureDemo`, `PageViewTypedDemo`, and `PageViewUntypedDemo` This has just been addressed. Thanks @ableegoldman for your recommendations yesterday. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request #10923: KAFKA-12976: Remove UNSUPPORTED_VERSION error from delete and describe topics calls
jolshan opened a new pull request #10923: URL: https://github.com/apache/kafka/pull/10923 Removed the condition to throw the error. Now we only throw when we didn't find the topic ID. Updated the test for IBP < 2.8 that tries to delete topics using ID. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.
IgnacioAcunaF commented on a change in pull request #10858: URL: https://github.com/apache/kafka/pull/10858#discussion_r657421072 ## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ## @@ -62,6 +62,92 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { +val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") +val groupService = consumerGroupService(args) + +val testTopicPartition0 = new TopicPartition("testTopic1", 0); +val testTopicPartition1 = new TopicPartition("testTopic1", 1); +val testTopicPartition2 = new TopicPartition("testTopic1", 2); +val testTopicPartition3 = new TopicPartition("testTopic2", 0); +val testTopicPartition4 = new TopicPartition("testTopic2", 1); +val testTopicPartition5 = new TopicPartition("testTopic2", 2); + +val offsets = Map( + //testTopicPartition0 -> there is no offset information for an asssigned topic partition + testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition + testTopicPartition2 -> null, //there is a null value for an asssigned topic partition + // testTopicPartition3 -> there is no offset information for an unasssigned topic partition + testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition + testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition +).asJava Review comment: Perfect, makes sense. Would do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10918: KAFKA-12756: Update ZooKeeper to v3.6.3
rondagostino commented on pull request #10918: URL: https://github.com/apache/kafka/pull/10918#issuecomment-86780 System test results before making the jar test dependency: https://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-06-23--001.1624469608--rondagostino--KAFKA-12756--bbc88d7e8/report.html ``` SESSION REPORT (ALL TESTS) ducktape version: 0.8.1 session_id: 2021-06-23--001 run time: 175 minutes 4.253 seconds tests run:867 passed: 659 failed: 11 ignored: 197 ``` - `kafkatest.tests.core.zookeeper_authorizer_test` failed for `metadata_quorum:REMOTE_KRAFT` -- unrelated to this change snce ZooKeeper isn't even involved. - `kafkatest.tests.core.upgrade_test` failed all 3 tests from version 2.8.0; this was due to 2.8.0 not being part of the Vagrant image. I added a commit to this PR to fix that and confirmed that the non-compression flavor of that test for 2.8.0 passed locally with Vagrant (this PR already had the change to add it to the Docker image for Docker-based system tests). - `kafkatest.sanity_checks.test_kafka_version` failed due to `kafka-topics --zookeeper` being used against the current version rather than the 0.8.2 version; added a commit to this PR to swap the node versions so the correct broker will get the request and confirmed locally that the test now passes. - `kafkatest.tests.streams.streams_eos_test` had 4 test failures. - `kafkatest.tests.streams.streams_upgrade_test` had 2 test failures I believe the steams failures are unrelated because I downgraded to Zookeeper v3.5.9 locally and ran `kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_version_probing_upgrade` -- it still failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] g1geordie commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
g1geordie commented on pull request #10190: URL: https://github.com/apache/kafka/pull/10190#issuecomment-867105129 @bbejeck Thanks for the review. You are right Please help me review again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata
[ https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368353#comment-17368353 ] Matthias J. Sax commented on KAFKA-12984: - [~ableegoldman] – is KS affected by this issue? Even if we use our own assignor, it seems that the issue with regard to "subscription state" could also affect KS? > Cooperative sticky assignor can get stuck with invalid SubscriptionState > input metadata > --- > > Key: KAFKA-12984 > URL: https://issues.apache.org/jira/browse/KAFKA-12984 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 3.0.0, 2.8.1 > > > Some users have reported seeing their consumer group become stuck in the > CompletingRebalance phase when using the cooperative-sticky assignor. Based > on the request metadata we were able to deduce that multiple consumers were > reporting the same partition(s) in their "ownedPartitions" field of the > consumer protocol. Since this is an invalid state, the input causes the > cooperative-sticky assignor to detect that something is wrong and throw an > IllegalStateException. If the consumer application is set up to simply retry, > this will cause the group to appear to hang in the rebalance state. > The "ownedPartitions" field is encoded based on the ConsumerCoordinator's > SubscriptionState, which was assumed to always be up to date. However there > may be cases where the consumer has dropped out of the group but fails to > clear the SubscriptionState, allowing it to report some partitions as owned > that have since been reassigned to another member. > We should (a) fix the sticky assignment algorithm to resolve cases of > improper input conditions by invalidating the "ownedPartitions" in cases of > double ownership, and (b) shore up the ConsumerCoordinator logic to better > handle rejoining the group and keeping its internal state consistent. See > KAFKA-12983 for more details on (b) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata
[ https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368354#comment-17368354 ] Michael Bingham commented on KAFKA-12984: - Does this issue potentially apply to {{StreamsPartitionAssignor}} as well? > Cooperative sticky assignor can get stuck with invalid SubscriptionState > input metadata > --- > > Key: KAFKA-12984 > URL: https://issues.apache.org/jira/browse/KAFKA-12984 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 3.0.0, 2.8.1 > > > Some users have reported seeing their consumer group become stuck in the > CompletingRebalance phase when using the cooperative-sticky assignor. Based > on the request metadata we were able to deduce that multiple consumers were > reporting the same partition(s) in their "ownedPartitions" field of the > consumer protocol. Since this is an invalid state, the input causes the > cooperative-sticky assignor to detect that something is wrong and throw an > IllegalStateException. If the consumer application is set up to simply retry, > this will cause the group to appear to hang in the rebalance state. > The "ownedPartitions" field is encoded based on the ConsumerCoordinator's > SubscriptionState, which was assumed to always be up to date. However there > may be cases where the consumer has dropped out of the group but fails to > clear the SubscriptionState, allowing it to report some partitions as owned > that have since been reassigned to another member. > We should (a) fix the sticky assignment algorithm to resolve cases of > improper input conditions by invalidating the "ownedPartitions" in cases of > double ownership, and (b) shore up the ConsumerCoordinator logic to better > handle rejoining the group and keeping its internal state consistent. See > KAFKA-12983 for more details on (b) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata
[ https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Bingham updated KAFKA-12984: Comment: was deleted (was: Does this issue potentially apply to {{StreamsPartitionAssignor}} as well?) > Cooperative sticky assignor can get stuck with invalid SubscriptionState > input metadata > --- > > Key: KAFKA-12984 > URL: https://issues.apache.org/jira/browse/KAFKA-12984 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 3.0.0, 2.8.1 > > > Some users have reported seeing their consumer group become stuck in the > CompletingRebalance phase when using the cooperative-sticky assignor. Based > on the request metadata we were able to deduce that multiple consumers were > reporting the same partition(s) in their "ownedPartitions" field of the > consumer protocol. Since this is an invalid state, the input causes the > cooperative-sticky assignor to detect that something is wrong and throw an > IllegalStateException. If the consumer application is set up to simply retry, > this will cause the group to appear to hang in the rebalance state. > The "ownedPartitions" field is encoded based on the ConsumerCoordinator's > SubscriptionState, which was assumed to always be up to date. However there > may be cases where the consumer has dropped out of the group but fails to > clear the SubscriptionState, allowing it to report some partitions as owned > that have since been reassigned to another member. > We should (a) fix the sticky assignment algorithm to resolve cases of > improper input conditions by invalidating the "ownedPartitions" in cases of > double ownership, and (b) shore up the ConsumerCoordinator logic to better > handle rejoining the group and keeping its internal state consistent. See > KAFKA-12983 for more details on (b) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r657306351 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/TopicCollection.java ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.Uuid; + +import java.util.Collection; + +public class TopicCollection { + +private Collection topicNames; +private Collection topicIds; +private final TopicAttribute attribute; + +public enum TopicAttribute { +TOPIC_NAME, TOPIC_ID Review comment: I guess this prevents the unsupported operation exception in the methods themselves though. We would need to convert the TopicCollection to TopicNameCollection before they could access the collection of names though, so I'd suspect we'd still need to handle there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12986) Throttled Replicas validator should validate that the proposed value is parseable
David Mao created KAFKA-12986: - Summary: Throttled Replicas validator should validate that the proposed value is parseable Key: KAFKA-12986 URL: https://issues.apache.org/jira/browse/KAFKA-12986 Project: Kafka Issue Type: Bug Components: admin, core Reporter: David Mao The ThrottledReplicaListValidator currently allows a string like leader.replication.throttled.replicas=,0:1 to be set which is unparseable by the TopicConfig callback handler. For robustness, the validator should also validate that the property can be parsed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] nnordrum closed pull request #2664: MINOR: Added additional -start/-stop files for consistency
nnordrum closed pull request #2664: URL: https://github.com/apache/kafka/pull/2664 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nnordrum closed pull request #3518: WIP: SimpleRegexAclAuthorizer
nnordrum closed pull request #3518: URL: https://github.com/apache/kafka/pull/3518 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
guozhangwang commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r657231704 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1200,7 +1179,7 @@ private long getCacheSizePerThread(final int numStreamThreads) { if (numStreamThreads == 0) { return totalCacheSize; } -return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); +return totalCacheSize / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0)); Review comment: If we have more than one named topologies that have global stores, should the global topologies share one cache sub-space? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -345,8 +343,17 @@ private SinkNodeFactory(final String name, } } +public void setTopologyName(final String namedTopology) { Review comment: Could we move the logic of `namedTopology.setTopologyName(topologyName);` into the constructor of `NamedTopologyStreamsBuilder(final String topologyName)` itself, and then call the constructor of `NamedTopology` directly, so that we can still have a final field in line 138 above? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -131,15 +135,9 @@ private StreamsConfig config = null; // The name of the topology this builder belongs to, or null if none -private final String namedTopology; - -public InternalTopologyBuilder() { -this.namedTopology = null; -} +private String namedTopology; Review comment: nit: maybe better be `topologyName`? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -364,6 +371,10 @@ public synchronized final StreamsConfig getStreamsConfig() { return config; } +public String namedTopology() { Review comment: Ditto, I feel its better to let `topologyName` return a String. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -1065,14 +1086,22 @@ private void buildProcessorNode(final Map> pro return Collections.unmodifiableMap(globalStateStores); } -public Set allStateStoreName() { +public Set allStateStoreNames() { Review comment: Nice find. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -1216,39 +1245,26 @@ private void setRegexMatchedTopicToStateStore() { } } -public synchronized Pattern earliestResetTopicsPattern() { -return resetTopicsPattern(earliestResetTopics, earliestResetPatterns); +public boolean hasOffsetResetOverrides() { +return !(earliestResetTopics.isEmpty() && earliestResetPatterns.isEmpty() +&& latestResetTopics.isEmpty() && latestResetPatterns.isEmpty()); } -public synchronized Pattern latestResetTopicsPattern() { -return resetTopicsPattern(latestResetTopics, latestResetPatterns); -} - -private Pattern resetTopicsPattern(final Set resetTopics, - final Set resetPatterns) { -final List topics = maybeDecorateInternalSourceTopics(resetTopics); - -return buildPattern(topics, resetPatterns); -} - -private static Pattern buildPattern(final Collection sourceTopics, -final Collection sourcePatterns) { -final StringBuilder builder = new StringBuilder(); - -for (final String topic : sourceTopics) { -builder.append(topic).append("|"); -} - -for (final Pattern sourcePattern : sourcePatterns) { -builder.append(sourcePattern.pattern()).append("|"); -} - -if (builder.length() > 0) { -builder.setLength(builder.length() - 1); -return Pattern.compile(builder.toString()); +public OffsetResetStrategy offsetResetStrategy(final String topic) { +if (maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) || +earliestResetPatterns.stream().anyMatch(p -> p.matcher(topic).matches())) { +return EARLIEST; +} else if (maybeDecorateInternalSourceTopics(latestResetTopics).contains(topic) || +latestResetPatterns.stream().anyMatch(p -> p.matcher(topic).matches())) { +return LATEST; +} else if (maybeDecorateInternalSourceTopics(sourceTopicNames).contains(topic) Review comment: Could you elaborate a bit on the `NONE` case? Not sure I fully follow here. ## File path:
[GitHub] [kafka] feyman2016 commented on pull request #10593: KAFKA-10800 Enhance the test for validation when the state machine creates a snapshot
feyman2016 commented on pull request #10593: URL: https://github.com/apache/kafka/pull/10593#issuecomment-866954962 @jsancio Thanks! @mumrah Hi, could you kindly help to review and merge? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10900: KAFKA-12967; KRaft broker should forward DescribeQuorum to controller
mumrah commented on a change in pull request #10900: URL: https://github.com/apache/kafka/pull/10900#discussion_r657234456 ## File path: core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.server.IntegrationTestUtils.connectAndReceive +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest +import org.apache.kafka.common.requests.{DescribeQuorumRequest, DescribeQuorumResponse} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.extension.ExtendWith + +import scala.jdk.CollectionConverters._ + +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT) +class DescribeQuorumRequestTest(cluster: ClusterInstance) { Review comment: Is it worth adding a test for ZK mode here? Just to see that it fails ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -217,6 +227,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request) case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request) case ApiKeys.ALLOCATE_PRODUCER_IDS => maybeForwardToController(request, handleAllocateProducerIdsRequest) +case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request) Review comment: Hm, so we are _only_ allowing forwarding with this `forwardToControllerOrFail` since this RPC did not exist before? Are there other recently added controller-only RPCs that need this treatment? ## File path: core/src/test/java/kafka/test/annotation/Type.java ## @@ -28,7 +28,7 @@ * The type of cluster config being requested. Used by {@link kafka.test.ClusterConfig} and the test annotations. */ public enum Type { -RAFT { +KRAFT { Review comment: Seems fine to change this here. I've been meaning to do this anyways. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 edited a comment on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
vamossagar12 edited a comment on pull request #10798: URL: https://github.com/apache/kafka/pull/10798#issuecomment-866823035 @guozhangwang / @cadonna I made some tweaks to the code and also started testing with 1M keys. Now I see differences in terms of throughput for both range and putAll queries around .3 ops/s and .15 ops/s respectively: Here is the compaision: ``` testPersistentRangeQueryPerformance original Benchmark Mode Cnt Score Error Units StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance thrpt 15 1.131 ? 0.028 ops/s testPersistentPutAllPerformance original Benchmark Mode Cnt Score Error Units StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance thrpt 15 0.919 ? 0.037 ops/s testPersistentRangeQueryPerformance bytebuffer StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance thrpt 15 1.442 ? 0.038 ops/s Benchmark Mode Cnt Score Error Units StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance thrpt 15 1.065 ? 0.041 ops/s ``` I needed to add ByteOrder after creating DirectByteBuffer object. And for putAll, i needed to flip before calling `put` to batch. Next step for me would be to create a kafka streams app and test throughput. I will post them here once i have those as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
wcarlson5 commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r657217991 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -16,10 +16,294 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO KAFKA-12648: +// 1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable public class TopologyMetadata { -//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683) +private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class); + +// the '_' character is not allowed for topology names, thus it's safe to use to indicate that it's not a named topology +private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__"; +private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); + +private final StreamsConfig config; +private final SortedMap builders; // Keep sorted by topology name for readability + +private ProcessorTopology globalTopology; +private Map globalStateStores = new HashMap<>(); +final Set allInputTopics = new HashSet<>(); + +public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +this.config = config; +builders = new TreeMap<>(); +if (builder.hasNamedTopology()) { +builders.put(builder.namedTopology(), builder); +} else { +builders.put(UNNAMED_TOPOLOGY, builder); +} +} + +public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +this.config = config; +this.builders = builders; +if (builders.isEmpty()) { +log.debug("Building KafkaStreams app with no empty topology"); +} +} + +public int getNumStreamThreads(final StreamsConfig config) { +final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + +// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later +if (builders.isEmpty()) { +if (configuredNumStreamThreads != 0) { Review comment: We would still need to check. Because the thread count could still be dropped to 0 with `removeThread()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #10854: KAFKA-12717: Remove internal Connect converter properties (KIP-738)
C0urante commented on a change in pull request #10854: URL: https://github.com/apache/kafka/pull/10854#discussion_r657202256 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ## @@ -361,55 +324,23 @@ protected static ConfigDef baseConfigDef() { .withClientSslSupport(); } -private void logInternalConverterDeprecationWarnings(Map props) { -String[] deprecatedConfigs = new String[] { -INTERNAL_KEY_CONVERTER_CLASS_CONFIG, -INTERNAL_VALUE_CONVERTER_CLASS_CONFIG -}; -for (String config : deprecatedConfigs) { -if (props.containsKey(config)) { -Class internalConverterClass = getClass(config); -logDeprecatedProperty(config, internalConverterClass.getCanonicalName(), INTERNAL_CONVERTER_DEFAULT.getCanonicalName(), null); -if (internalConverterClass.equals(INTERNAL_CONVERTER_DEFAULT)) { -// log the properties for this converter ... -for (Map.Entry propEntry : originalsWithPrefix(config + ".").entrySet()) { -String prop = propEntry.getKey(); -String propValue = propEntry.getValue().toString(); -String defaultValue = JsonConverterConfig.SCHEMAS_ENABLE_CONFIG.equals(prop) ? "false" : null; -logDeprecatedProperty(config + "." + prop, propValue, defaultValue, config); -} -} +private void logInternalConverterRemovalWarnings(Map props) { +List removedProperties = new ArrayList<>(); +for (String property : Arrays.asList("internal.key.converter", "internal.value.converter")) { +if (props.containsKey(property)) { +removedProperties.add(property); } +removedProperties.addAll(originalsWithPrefix(property + ".").keySet()); } -} - -private void logDeprecatedProperty(String propName, String propValue, String defaultValue, String prefix) { -String prefixNotice = prefix != null -? " (along with all configuration for '" + prefix + "')" -: ""; -if (defaultValue != null && defaultValue.equalsIgnoreCase(propValue)) { -log.info( -"Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release. " -+ "The specified value '{}' matches the default, so this property can be safely removed from the worker configuration.", -propName, -prefixNotice, -propValue -); -} else if (defaultValue != null) { +if (!removedProperties.isEmpty()) { log.warn( -"Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release. " -+ "The specified value '{}' does NOT match the default and recommended value '{}'.", -propName, -prefixNotice, -propValue, -defaultValue -); -} else { -log.warn( -"Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release.", -propName, -prefixNotice -); +"The worker has been configured with one or more internal converter properties ({}). " ++ "Support for these properties was dropped in version 3.0, and specifying them will " Review comment: Sure, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #10854: KAFKA-12717: Remove internal Connect converter properties (KIP-738)
C0urante commented on a change in pull request #10854: URL: https://github.com/apache/kafka/pull/10854#discussion_r657197034 ## File path: docs/upgrade.html ## @@ -76,6 +76,12 @@ Notable changes in 3 understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass in new ConsumerGroupMetadata(consumerGroupId) to work with older brokers. See https://cwiki.apache.org/confluence/x/zJONCg;>KIP-732 for more details. + +The Connect internal.key.converter and internal.value.converter properties have been completely https://cwiki.apache.org/confluence/x/2YDOCg;>removed. +Workers are now hardcoded to use the JSON converter with schemas.enable set to false. If your cluster has been using +a different internal key or value converter, you can follow the migration steps outlined in https://cwiki.apache.org/confluence/x/2YDOCg;>KIP-738 +to safely upgrade your Connect cluster to 3.0. Review comment: Good call, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
bbejeck commented on pull request #10190: URL: https://github.com/apache/kafka/pull/10190#issuecomment-866910927 @g1geordie apologies for letting this go for so long. I'm looking now and let's see if we can't get this in over the next few days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #10854: KAFKA-12717: Remove internal Connect converter properties (KIP-738)
rhauch commented on pull request #10854: URL: https://github.com/apache/kafka/pull/10854#issuecomment-866908588 For the record, mostly green builds with only some builds having only failures unrelated to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh
dajac commented on a change in pull request #10858: URL: https://github.com/apache/kafka/pull/10858#discussion_r657158355 ## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ## @@ -62,6 +62,92 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { +val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") +val groupService = consumerGroupService(args) + +val testTopicPartition0 = new TopicPartition("testTopic1", 0); +val testTopicPartition1 = new TopicPartition("testTopic1", 1); +val testTopicPartition2 = new TopicPartition("testTopic1", 2); +val testTopicPartition3 = new TopicPartition("testTopic2", 0); +val testTopicPartition4 = new TopicPartition("testTopic2", 1); +val testTopicPartition5 = new TopicPartition("testTopic2", 2); + +val offsets = Map( + //testTopicPartition0 -> there is no offset information for an asssigned topic partition + testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition + testTopicPartition2 -> null, //there is a null value for an asssigned topic partition + // testTopicPartition3 -> there is no offset information for an unasssigned topic partition + testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition + testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition +).asJava Review comment: I would remove all the comments and only put one before `offsets` which explains that certain partitions are not present and certain have `null` or something along these lines. ## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ## @@ -62,6 +62,92 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { +val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") +val groupService = consumerGroupService(args) + +val testTopicPartition0 = new TopicPartition("testTopic1", 0); +val testTopicPartition1 = new TopicPartition("testTopic1", 1); +val testTopicPartition2 = new TopicPartition("testTopic1", 2); +val testTopicPartition3 = new TopicPartition("testTopic2", 0); +val testTopicPartition4 = new TopicPartition("testTopic2", 1); +val testTopicPartition5 = new TopicPartition("testTopic2", 2); + +val offsets = Map( + //testTopicPartition0 -> there is no offset information for an asssigned topic partition + testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition + testTopicPartition2 -> null, //there is a null value for an asssigned topic partition + // testTopicPartition3 -> there is no offset information for an unasssigned topic partition + testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition + testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition +).asJava + +val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) +val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), +) +val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 ) +val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet + +def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = { Review comment: `describeGroupsResult` is used only once in the test. It seems that we could simply declare a variable which contains the result that we want to return. ## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ## @@ -62,6 +62,92 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { +val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") +val groupService = consumerGroupService(args) + +val testTopicPartition0 = new TopicPartition("testTopic1", 0); +val
[GitHub] [kafka] rhauch commented on a change in pull request #10854: KAFKA-12717: Remove internal Connect converter properties (KIP-738)
rhauch commented on a change in pull request #10854: URL: https://github.com/apache/kafka/pull/10854#discussion_r657156175 ## File path: docs/upgrade.html ## @@ -76,6 +76,12 @@ Notable changes in 3 understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass in new ConsumerGroupMetadata(consumerGroupId) to work with older brokers. See https://cwiki.apache.org/confluence/x/zJONCg;>KIP-732 for more details. + +The Connect internal.key.converter and internal.value.converter properties have been completely https://cwiki.apache.org/confluence/x/2YDOCg;>removed. +Workers are now hardcoded to use the JSON converter with schemas.enable set to false. If your cluster has been using +a different internal key or value converter, you can follow the migration steps outlined in https://cwiki.apache.org/confluence/x/2YDOCg;>KIP-738 +to safely upgrade your Connect cluster to 3.0. Review comment: Might be good to mention that these have been deprecated since AK 2.0. This is similar to other items earlier in this list that mention the earlier version when defaults changed. Maybe something like: ```suggestion The Connect internal.key.converter and internal.value.converter properties have been completely https://cwiki.apache.org/confluence/x/2YDOCg;>removed. The use of these Connect worker properties has been deprecated since version 2.0.0. Workers are now hardcoded to use the JSON converter with schemas.enable set to false. If your cluster has been using a different internal key or value converter, you can follow the migration steps outlined in https://cwiki.apache.org/confluence/x/2YDOCg;>KIP-738 to safely upgrade your Connect cluster to 3.0. ``` ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ## @@ -361,55 +324,23 @@ protected static ConfigDef baseConfigDef() { .withClientSslSupport(); } -private void logInternalConverterDeprecationWarnings(Map props) { -String[] deprecatedConfigs = new String[] { -INTERNAL_KEY_CONVERTER_CLASS_CONFIG, -INTERNAL_VALUE_CONVERTER_CLASS_CONFIG -}; -for (String config : deprecatedConfigs) { -if (props.containsKey(config)) { -Class internalConverterClass = getClass(config); -logDeprecatedProperty(config, internalConverterClass.getCanonicalName(), INTERNAL_CONVERTER_DEFAULT.getCanonicalName(), null); -if (internalConverterClass.equals(INTERNAL_CONVERTER_DEFAULT)) { -// log the properties for this converter ... -for (Map.Entry propEntry : originalsWithPrefix(config + ".").entrySet()) { -String prop = propEntry.getKey(); -String propValue = propEntry.getValue().toString(); -String defaultValue = JsonConverterConfig.SCHEMAS_ENABLE_CONFIG.equals(prop) ? "false" : null; -logDeprecatedProperty(config + "." + prop, propValue, defaultValue, config); -} -} +private void logInternalConverterRemovalWarnings(Map props) { +List removedProperties = new ArrayList<>(); +for (String property : Arrays.asList("internal.key.converter", "internal.value.converter")) { +if (props.containsKey(property)) { +removedProperties.add(property); } +removedProperties.addAll(originalsWithPrefix(property + ".").keySet()); } -} - -private void logDeprecatedProperty(String propName, String propValue, String defaultValue, String prefix) { -String prefixNotice = prefix != null -? " (along with all configuration for '" + prefix + "')" -: ""; -if (defaultValue != null && defaultValue.equalsIgnoreCase(propValue)) { -log.info( -"Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release. " -+ "The specified value '{}' matches the default, so this property can be safely removed from the worker configuration.", -propName, -prefixNotice, -propValue -); -} else if (defaultValue != null) { +if (!removedProperties.isEmpty()) { log.warn( -"Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release. " -+ "The specified value '{}' does NOT match the default and recommended value '{}'.", -propName, -prefixNotice, -propValue, -defaultValue -); -} else { -log.warn(
[jira] [Resolved] (KAFKA-12482) Remove deprecated rest.host.name and rest.port Connect worker configs
[ https://issues.apache.org/jira/browse/KAFKA-12482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-12482. --- Reviewer: Randall Hauch Resolution: Fixed Merged to `trunk`. > Remove deprecated rest.host.name and rest.port Connect worker configs > - > > Key: KAFKA-12482 > URL: https://issues.apache.org/jira/browse/KAFKA-12482 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Randall Hauch >Assignee: Kalpesh Patel >Priority: Critical > Fix For: 3.0.0 > > > The following Connect worker configuration properties were deprecated and > should be removed in 3.0.0: > * {{rest.host.name}} (deprecated in > [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface]) > * {{rest.port}} (deprecated in > [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface]) > See KAFKA-12717 for removing the internal converter configurations: > * {{internal.key.converter}} (deprecated in > [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig]) > * {{internal.value.converter}} (deprecated in > [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig]) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch merged pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs
rhauch merged pull request #10841: URL: https://github.com/apache/kafka/pull/10841 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r657148983 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.util.ConnectorTaskId; + +/** + * An immutable restart plan. + */ +public class RestartPlan { + +private final RestartRequest request; +private final ConnectorStateInfo stateInfo; +private final Collection idsToRestart; + +/** + * Create a new request to restart a connector and optionally its tasks. + * + * @param request the restart request; may not be null + * @param restartStateInfo the current state info for the connector; may not be null + */ +public RestartPlan(RestartRequest request, ConnectorStateInfo restartStateInfo) { +this.request = Objects.requireNonNull(request, "RestartRequest name may not be null"); +this.stateInfo = Objects.requireNonNull(restartStateInfo, "ConnectorStateInfo name may not be null"); +// Collect the task IDs to stop and restart (may be none) +idsToRestart = Collections.unmodifiableList( +stateInfo.tasks() +.stream() +.filter(this::isRestarting) +.map(taskState -> new ConnectorTaskId(request.connectorName(), taskState.id())) +.collect(Collectors.toList()) +); +} + +/** + * Get the connector name. + * + * @return the name of the connector; never null + */ +public String connectorName() { +return request.connectorName(); +} + +/** + * Get the original {@link RestartRequest}. + * + * @return the restart request; never null + */ +public RestartRequest restartRequest() { +return request; +} + +/** + * Get the {@link ConnectorStateInfo} that reflects the current state of the connector except with the {@code status} + * set to {@link AbstractStatus.State#RESTARTING} for the {@link Connector} instance and any {@link Task} instances that + * are to be restarted, based upon the {@link #restartRequest() restart request}. + * + * @return the connector state info that reflects the restart plan; never null + */ +public ConnectorStateInfo restartConnectorStateInfo() { +return stateInfo; +} + +/** + * Get the immutable collection of {@link ConnectorTaskId} for all tasks to be restarted + * based upon the {@link #restartRequest() restart request}. + * + * @return the IDs of the tasks to be restarted; never null but possibly empty + */ +public Collection taskIdsToRestart() { +return idsToRestart; +} + +/** + * Determine whether the {@link Connector} instance is to be restarted + * based upon the {@link #restartRequest() restart request}. + * + * @return true if the {@link Connector} instance is to be restarted, or false otherwise + */ +public boolean restartConnector() { +return isRestarting(stateInfo.connector()); +} + +/** + * Determine whether at least one {@link Task} instance is to be restarted + * based upon the {@link #restartRequest() restart request}. + * + * @return true if any {@link Task} instances are to be restarted, or false if none are to be restarted + */ +public boolean restartAnyTasks() { Review comment: Renamed to shouldRestartTasks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API
jlprat commented on pull request #10840: URL: https://github.com/apache/kafka/pull/10840#issuecomment-866870832 @ableegoldman and @cadonna thanks a lot for your reviews! I think I addressed all of your comments. Let me know what do you think about them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API
jlprat commented on a change in pull request #10840: URL: https://github.com/apache/kafka/pull/10840#discussion_r657138308 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskMetadataImplTest.java ## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.TaskMetadata; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + + +public class TaskMetadataImplTest { + +public static final TaskId TASK_ID = new TaskId(1, 2); +public static final TopicPartition TP_0 = new TopicPartition("t", 0); +public static final TopicPartition TP_1 = new TopicPartition("t", 1); +public static final Set TOPIC_PARTITIONS = mkSet(TP_0, TP_1); +public static final Map COMMITTED_OFFSETS = mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 2L)); +public static final Map END_OFFSETS = mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 3L)); +public static final Optional TIME_CURRENT_IDLING_STARTED = Optional.of(3L); + +private TaskMetadata taskMetadata; + +@Before +public void setUp() { +taskMetadata = new TaskMetadataImpl( +TASK_ID, +TOPIC_PARTITIONS, +COMMITTED_OFFSETS, +END_OFFSETS, +TIME_CURRENT_IDLING_STARTED); +} + +@Test +public void shouldNotAllowModificationOfInternalStateViaGetters() { +assertTrue(isUnmodifiable(taskMetadata.topicPartitions())); +assertTrue(isUnmodifiable(taskMetadata.committedOffsets())); +assertTrue(isUnmodifiable(taskMetadata.endOffsets())); +} + +@Test +public void shouldFollowEqualsAndHasCodeContract() { Review comment: @cadonna For hash code and equals contract validation, I decided to keep it under one single test, and validate the positive cases and any of the reasons why it might fall into the negative case. Please note that objects differing only on committed offsets, end offsets, and/or time curring idling started will be considered equals. ## File path: streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java ## @@ -55,6 +62,63 @@ public void shouldNotAllowModificationOfInternalStateViaGetters() { assertTrue(isUnmodifiable(streamsMetadata.standbyStateStoreNames())); } +@Test +public void shouldFollowHashCodeAndEqualsContract() { Review comment: @cadonna For hash code and equals contract validation, I decided to keep it under one single test, and validate the positive case and any of the reasons why it might fall into the negative case. ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImplTest.java ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package
[GitHub] [kafka] jlprat commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API
jlprat commented on pull request #10840: URL: https://github.com/apache/kafka/pull/10840#issuecomment-866866991 Needed to rebase as there were some conflicts with trunk, hence the force push. I applied the changes in separate commits: - One for method renames, formattings and small refactors (like the unmodifiable collections) - One for Adding tests - One to fix the regression caused by the unmodifiable collections in constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API
jlprat commented on a change in pull request #10840: URL: https://github.com/apache/kafka/pull/10840#discussion_r657134887 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java ## @@ -57,7 +57,11 @@ public ThreadMetadataImpl(final String threadName, final Set standbyTasks) { this.mainConsumerClientId = mainConsumerClientId; this.restoreConsumerClientId = restoreConsumerClientId; -this.producerClientIds = Collections.unmodifiableSet(producerClientIds); +if (producerClientIds != null) { +this.producerClientIds = Collections.unmodifiableSet(producerClientIds); +} else { +this.producerClientIds = Collections.emptySet(); +} Review comment: @cadonna, I needed to add this guard as ThreadMetadataTest was failing with NPE. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API
jlprat commented on a change in pull request #10840: URL: https://github.com/apache/kafka/pull/10840#discussion_r657132576 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1478,8 +1499,36 @@ public void cleanUp() { * @param storeName the {@code storeName} to find metadata for * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of * this application + * @deprecated since 3.0.0 use {@link KafkaStreams#allMetadataForGivenStore} instead */ -public Collection allMetadataForStore(final String storeName) { +@Deprecated +public Collection allMetadataForStore(final String storeName) { +validateIsRunningOrRebalancing(); +return streamsMetadataState.getAllMetadataForStore(storeName).stream().map(streamsMetadata -> +new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(), +streamsMetadata.stateStoreNames(), +streamsMetadata.topicPartitions(), +streamsMetadata.standbyStateStoreNames(), +streamsMetadata.standbyTopicPartitions())) +.collect(Collectors.toSet()); +} + +/** + * Find all currently running {@code KafkaStreams} instances (potentially remotely) that + * + * use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all + * instances that belong to the same Kafka Streams application) + * and that contain a {@link StateStore} with the given {@code storeName} + * + * and return {@link StreamsMetadata} for each discovered instance. + * + * Note: this is a point in time view and it may change due to partition reassignment. + * + * @param storeName the {@code storeName} to find metadata for + * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of + * this application + */ +public Collection allMetadataForGivenStore(final String storeName) { Review comment: I decided to for for the pattern xxxForxxx to keep consistency among different changes. It is now `streamsMetadataForStore` but happy to change if anyone has reasons against it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API
jlprat commented on a change in pull request #10840: URL: https://github.com/apache/kafka/pull/10840#discussion_r657132064 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1558,12 +1607,45 @@ private void processStreamThread(final Consumer consumer) { for (final StreamThread thread : copy) consumer.accept(thread); } +/** + * Returns runtime information about the local threads of this {@link KafkaStreams} instance. + * + * @return the set of {@link org.apache.kafka.streams.processor.ThreadMetadata}. + * @deprecated since 3.0 use {@link #threadsMetadata()} + */ +@Deprecated +@SuppressWarnings("deprecation") +public Set localThreadsMetadata() { +return threadsMetadata().stream().map(threadMetadata -> new org.apache.kafka.streams.processor.ThreadMetadata( +threadMetadata.threadName(), +threadMetadata.threadState(), +threadMetadata.consumerClientId(), +threadMetadata.restoreConsumerClientId(), +threadMetadata.producerClientIds(), +threadMetadata.adminClientId(), +threadMetadata.activeTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata( +taskMetadata.taskId().toString(), +taskMetadata.topicPartitions(), +taskMetadata.committedOffsets(), +taskMetadata.endOffsets(), +taskMetadata.timeCurrentIdlingStarted()) +).collect(Collectors.toSet()), +threadMetadata.standbyTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata( +taskMetadata.taskId().toString(), +taskMetadata.topicPartitions(), +taskMetadata.committedOffsets(), +taskMetadata.endOffsets(), +taskMetadata.timeCurrentIdlingStarted()) +).collect(Collectors.toSet( +.collect(Collectors.toSet()); +} + /** * Returns runtime information about the local threads of this {@link KafkaStreams} instance. * * @return the set of {@link ThreadMetadata}. */ -public Set localThreadsMetadata() { +public Set threadsMetadata() { Review comment: I decided to for for the pattern xxxForxxx to keep consistency among different changes. It is now `metadataForLocalThreads` but happy to change if anyone has reasons against it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API
jlprat commented on a change in pull request #10840: URL: https://github.com/apache/kafka/pull/10840#discussion_r657131881 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1458,8 +1457,30 @@ public void cleanUp() { * Note: this is a point in time view and it may change due to partition reassignment. * * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application + * @deprecated since 3.0.0 use {@link KafkaStreams#allRunningMetadata} */ -public Collection allMetadata() { +@Deprecated +public Collection allMetadata() { +validateIsRunningOrRebalancing(); +return streamsMetadataState.getAllMetadata().stream().map(streamsMetadata -> +new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(), +streamsMetadata.stateStoreNames(), +streamsMetadata.topicPartitions(), +streamsMetadata.standbyStateStoreNames(), +streamsMetadata.standbyTopicPartitions())) +.collect(Collectors.toSet()); +} + +/** + * Find all currently running {@code KafkaStreams} instances (potentially remotely) that use the same + * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to + * the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance. + * + * Note: this is a point in time view and it may change due to partition reassignment. + * + * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application + */ +public Collection allRunningMetadata() { Review comment: I decided to for for the pattern xxxForxxx to keep consistency among different changes. It is now `metadataForAllStreamsClients` but happy to change if anyone has reasons against it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
rhauch commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r657118416 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.util.ConnectorTaskId; + +/** + * An immutable restart plan. + */ +public class RestartPlan { + +private final RestartRequest request; +private final ConnectorStateInfo stateInfo; +private final Collection idsToRestart; + +/** + * Create a new request to restart a connector and optionally its tasks. + * + * @param request the restart request; may not be null + * @param restartStateInfo the current state info for the connector; may not be null + */ +public RestartPlan(RestartRequest request, ConnectorStateInfo restartStateInfo) { +this.request = Objects.requireNonNull(request, "RestartRequest name may not be null"); +this.stateInfo = Objects.requireNonNull(restartStateInfo, "ConnectorStateInfo name may not be null"); +// Collect the task IDs to stop and restart (may be none) +idsToRestart = Collections.unmodifiableList( +stateInfo.tasks() +.stream() +.filter(this::isRestarting) +.map(taskState -> new ConnectorTaskId(request.connectorName(), taskState.id())) +.collect(Collectors.toList()) +); +} + +/** + * Get the connector name. + * + * @return the name of the connector; never null + */ +public String connectorName() { +return request.connectorName(); +} + +/** + * Get the original {@link RestartRequest}. + * + * @return the restart request; never null + */ +public RestartRequest restartRequest() { +return request; +} + +/** + * Get the {@link ConnectorStateInfo} that reflects the current state of the connector except with the {@code status} + * set to {@link AbstractStatus.State#RESTARTING} for the {@link Connector} instance and any {@link Task} instances that + * are to be restarted, based upon the {@link #restartRequest() restart request}. + * + * @return the connector state info that reflects the restart plan; never null + */ +public ConnectorStateInfo restartConnectorStateInfo() { Review comment: This is subtly different than a normal `ConnectorStateInfo`, so keeping the current name may be a bit more clear in the context where this method is used, even if it's less conventional. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
rhauch commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r655779197 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartRequest.java ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import java.util.Objects; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; + +/** + * A request to restart a connector and/or task instances. + * The natural order is based upon the connector name, if two requests have the same connector name, then the requests are ordered based on the probable number of tasks/connector this request is going to restart. Review comment: Nit: ```suggestion * The natural order is based first upon the connector name and then requested restart behaviors. * If two requests have the same connector name, then the requests are ordered based on the * probable number of tasks/connector this request is going to restart. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12790) Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent versions of 8 and 11
[ https://issues.apache.org/jira/browse/KAFKA-12790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368114#comment-17368114 ] Ismael Juma commented on KAFKA-12790: - [~ueisele] I submitted a fix. Thanks once again for the help! > Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent > versions of 8 and 11 > --- > > Key: KAFKA-12790 > URL: https://issues.apache.org/jira/browse/KAFKA-12790 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 3.0.0 > > > Support for TLS 1.0 and 1.1 was disabled in recent versions of Java 8/11 > and all versions of 16. Re-enable it in this test so that we can verify > the server behavior when it establishes connections with such TLS > versions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on a change in pull request #10918: KAFKA-12756: Update ZooKeeper to v3.6.3
rondagostino commented on a change in pull request #10918: URL: https://github.com/apache/kafka/pull/10918#discussion_r657107216 ## File path: gradle/dependencies.gradle ## @@ -61,6 +61,7 @@ versions += [ bcpkix: "1.66", checkstyle: "8.36.2", commonsCli: "1.4", + dropwizardMetrics: "3.2.5", Review comment: > Does this conflict with the yammer metrics library It doesn't, no. The yammer metrics library is called `metrics-core-2.2.0.jar` and has all classes underneath the `com.yammer.metrics` package. The dropwizard library is called `metrics-core-3.2.5.jar` and has all classes underneath the `com.codahale.metrics` package. It isn't ideal that the jar names differ only by the version, but there is no conflict. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10918: KAFKA-12756: Update ZooKeeper to v3.6.3
rondagostino commented on a change in pull request #10918: URL: https://github.com/apache/kafka/pull/10918#discussion_r657104600 ## File path: docs/upgrade.html ## @@ -21,6 +21,13 @@ Notable changes in 3.0.0 +ZooKeeper has been upgraded to 3.6.3, and that version has a hard dependency on the +io.dropwizard.metrics:metrics-core:3.2.5 library due to the new metrics subsystem added in 3.6.0. +Setting metricsProvider.className=org.apache.zookeeper.metrics.impl.NullMetricsProvider in your +zookeeper.properties file does not remove this dependency. ZooKeeper will fail to start with +java.lang.NoClassDefFoundError: com/codahale/metrics/Reservoir if you do not have the above library +on your CLASSPATH. + Review comment: > Isn't this handled automatically for users Yeah, good point, no need to discuss it here. Will remove. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12786) Getting SslTransportLayerTest error
[ https://issues.apache.org/jira/browse/KAFKA-12786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-12786. - Resolution: Duplicate Since it's the same root cause as KAFKA-12790, marking as a duplicate of that. > Getting SslTransportLayerTest error > > > Key: KAFKA-12786 > URL: https://issues.apache.org/jira/browse/KAFKA-12786 > Project: Kafka > Issue Type: Bug > Components: unit tests > Environment: Ububtu 20.04 >Reporter: Sibelle >Assignee: Ismael Juma >Priority: Major > Labels: beginner > Attachments: Error.png > > > SaslAuthenticatorTest > testRepeatedValidSaslPlainOverSsl() PASSED > org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedTLSVersion(Args)[1] > failed, log available in > /kafka/clients/build/reports/testOutput/org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedTLSVersion(Args)[1].test.stdout > SslTransportLayerTest > [1] tlsProtocol=TLSv1.2, useInlinePem=false FAILED > org.opentest4j.AssertionFailedError: Condition not met within timeout > 15000. Metric not updated failed-authentication-total expected:<1.0> but > was:<0.0> ==> expected: but was: > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) > at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:320) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368) > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:317) > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:301) > at > org.apache.kafka.common.network.NioEchoServer.waitForMetrics(NioEchoServer.java:196) > at > org.apache.kafka.common.network.NioEchoServer.verifyAuthenticationMetrics(NioEchoServer.java:155) > at > org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedTLSVersion(SslTransportLayerTest.java:644) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12790) Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent versions of 8 and 11
[ https://issues.apache.org/jira/browse/KAFKA-12790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-12790: Description: Support for TLS 1.0 and 1.1 was disabled in recent versions of Java 8/11 and all versions of 16. Re-enable it in this test so that we can verify the server behavior when it establishes connections with such TLS versions. was:[|https://github.com/apache/kafka/pull/10415#issuecomment-808230478] > Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent > versions of 8 and 11 > --- > > Key: KAFKA-12790 > URL: https://issues.apache.org/jira/browse/KAFKA-12790 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Rajini Sivaram >Priority: Major > Fix For: 3.0.0 > > > Support for TLS 1.0 and 1.1 was disabled in recent versions of Java 8/11 > and all versions of 16. Re-enable it in this test so that we can verify > the server behavior when it establishes connections with such TLS > versions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12790) Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent versions of 8 and 11
[ https://issues.apache.org/jira/browse/KAFKA-12790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reassigned KAFKA-12790: --- Reviewer: Rajini Sivaram Assignee: Ismael Juma (was: Rajini Sivaram) > Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent > versions of 8 and 11 > --- > > Key: KAFKA-12790 > URL: https://issues.apache.org/jira/browse/KAFKA-12790 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 3.0.0 > > > Support for TLS 1.0 and 1.1 was disabled in recent versions of Java 8/11 > and all versions of 16. Re-enable it in this test so that we can verify > the server behavior when it establishes connections with such TLS > versions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12790) Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent versions of 8 and 11
[ https://issues.apache.org/jira/browse/KAFKA-12790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-12790: Summary: Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent versions of 8 and 11 (was: Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16) > Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent > versions of 8 and 11 > --- > > Key: KAFKA-12790 > URL: https://issues.apache.org/jira/browse/KAFKA-12790 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Rajini Sivaram >Priority: Major > Fix For: 3.0.0 > > > Details can be found in the PR: > https://github.com/apache/kafka/pull/10415#issuecomment-808230478 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12790) Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent versions of 8 and 11
[ https://issues.apache.org/jira/browse/KAFKA-12790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-12790: Description: [|https://github.com/apache/kafka/pull/10415#issuecomment-808230478] (was: Details can be found in the PR: https://github.com/apache/kafka/pull/10415#issuecomment-808230478) > Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent > versions of 8 and 11 > --- > > Key: KAFKA-12790 > URL: https://issues.apache.org/jira/browse/KAFKA-12790 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Rajini Sivaram >Priority: Major > Fix For: 3.0.0 > > > [|https://github.com/apache/kafka/pull/10415#issuecomment-808230478] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma opened a new pull request #10922: KAFKA-12790: Fix SslTransportLayerTest.testUnsupportedTlsVersion with recent JDKs
ijuma opened a new pull request #10922: URL: https://github.com/apache/kafka/pull/10922 Support for TLS 1.0 and 1.1 was disabled in recent versions of Java 8/11 and all versions of 16. Re-enable it in this test so that we can verify the server behavior when it establishes connections with such TLS versions. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
vamossagar12 commented on pull request #10798: URL: https://github.com/apache/kafka/pull/10798#issuecomment-866823035 @guozhangwang / @cadonna I made some tweaks to the code and also started testing with 1M keys. Now I see differences in terms of throughput for both range and putAll queries: Here is the compaision: ``` testPersistentRangeQueryPerformance original Benchmark Mode Cnt Score Error Units StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance thrpt 15 1.131 ? 0.028 ops/s testPersistentPutAllPerformance original Benchmark Mode Cnt Score Error Units StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance thrpt 15 0.919 ? 0.037 ops/s testPersistentRangeQueryPerformance bytebuffer StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance thrpt 15 1.442 ? 0.038 ops/s Benchmark Mode Cnt Score Error Units StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance thrpt 15 1.065 ? 0.041 ops/s ``` I needed to add ByteOrder after creating DirectByteBuffer object. And for putAll, i needed to flip before calling `put` to batch. Next step for me would be to create a kafka streams app and test throughput. I will post them here once i have those as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r657052453 ## File path: clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java ## @@ -54,9 +55,11 @@ public static Builder forReplica(short allowedVersion, int replicaId) { return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED); } -public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) { +public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel, boolean requireMaxTimestamp) { Review comment: yep, I added a small one to verify oldest versions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r657052063 ## File path: clients/src/main/resources/common/message/ListOffsetsResponse.json ## @@ -29,7 +29,9 @@ // Version 5 adds a new error code, OFFSET_NOT_AVAILABLE. // // Version 6 enables flexible versions. - "validVersions": "0-6", + // + // Version 7 is the same as version 6. Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r657051441 ## File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ## @@ -149,6 +179,21 @@ class LogOffsetTest extends BaseRequestTest { assertFalse(offsetChanged) } + @Test + def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(): Unit = { +val topic = "kafka-" +val topicPartition = new TopicPartition(topic, 0) +val log = createTopicAndGetLog(topic, topicPartition) + +log.updateHighWatermark(log.logEndOffset) + +val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) +assertEquals(0L, log.logEndOffset) +assertEquals(0L, maxTimestampOffset.get.offset) +assertEquals(-1L, maxTimestampOffset.get.timestamp) + Review comment: done ## File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ## @@ -266,4 +311,14 @@ class LogOffsetTest extends BaseRequestTest { .partitions.asScala.find(_.partitionIndex == tp.partition).get } + private def createTopicAndGetLog(topic: String, topicPartition: TopicPartition): Log = { + Review comment: done ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception { } } +@Test +public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { + Review comment: done ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception { } } +@Test +public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { + +Node node = new Node(0, "localhost", 8120); +List nodes = Collections.singletonList(node); +final Cluster cluster = new Cluster( +"mockClusterId", +nodes, +Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), +Collections.emptySet(), +Collections.emptySet(), +node); +final TopicPartition tp0 = new TopicPartition("foo", 0); + +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + +// listoffsets response from broker 0 +env.kafkaClient().prepareUnsupportedVersionResponse( +request -> request instanceof ListOffsetsRequest); + +ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp())); + +TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class); +} +} + +@Test +public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception { + Review comment: done ## File path: clients/src/main/resources/common/message/ListOffsetsRequest.json ## @@ -30,7 +30,9 @@ // Version 5 is the same as version 4. // // Version 6 enables flexible versions. - "validVersions": "0-6", + // + // Version 7 enables listing offsets by max timestamp. Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r657051339 ## File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ## @@ -93,16 +87,52 @@ class LogOffsetTest extends BaseRequestTest { } @Test - def testGetOffsetsBeforeLatestTime(): Unit = { + def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = { val topic = "kafka-" val topicPartition = new TopicPartition(topic, 0) +val log = createTopicAndGetLog(topic, topicPartition) -createTopic(topic, 1, 1) +for (timestamp <- 0 until 20) + log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp.toLong), leaderEpoch = 0) +log.flush() -val logManager = server.getLogManager -TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined, - s"Log for partition $topicPartition should be created") -val log = logManager.getLog(topicPartition).get +log.updateHighWatermark(log.logEndOffset) + +val firstOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) +assertEquals(19L, firstOffset.get.offset) +assertEquals(19L, firstOffset.get.timestamp) + +log.truncateTo(0) + +val secondOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) +assertEquals(0L, secondOffset.get.offset) +assertEquals(-1L, secondOffset.get.timestamp) + Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng edited a comment on pull request #10825: KAFKA-5876: Add `streams()` method to StateStoreProvider
vitojeng edited a comment on pull request #10825: URL: https://github.com/apache/kafka/pull/10825#issuecomment-866794585 update PR & rebase trunk. Remove: ``` KafkaStreams streams(); ``` Add ``` KafkaStreams.State streamsState(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
guozhangwang commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r657029488 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -799,41 +795,41 @@ public KafkaStreams(final Topology topology, public KafkaStreams(final Topology topology, final StreamsConfig config, final Time time) { -this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier(), time); +this(new TopologyMetadata(topology.internalTopologyBuilder, config), config, new DefaultKafkaClientSupplier(), time); } -private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, - final StreamsConfig config, - final KafkaClientSupplier clientSupplier) throws StreamsException { -this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM); -} - -private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, +private KafkaStreams(final Topology topology, final StreamsConfig config, final KafkaClientSupplier clientSupplier, final Time time) throws StreamsException { +this(new TopologyMetadata(topology.internalTopologyBuilder, config), config, clientSupplier, time); +} + +protected KafkaStreams(final TopologyMetadata topologyMetadata, Review comment: Do these two functions need to be `protected` rather than `private`? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -16,10 +16,294 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO KAFKA-12648: +// 1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable public class TopologyMetadata { -//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683) +private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class); + +// the '_' character is not allowed for topology names, thus it's safe to use to indicate that it's not a named topology +private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__"; +private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); + +private final StreamsConfig config; +private final SortedMap builders; // Keep sorted by topology name for readability + +private ProcessorTopology globalTopology; +private Map globalStateStores = new HashMap<>(); +final Set allInputTopics = new HashSet<>(); + +public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +this.config = config; +builders = new TreeMap<>(); +if (builder.hasNamedTopology()) { +builders.put(builder.namedTopology(), builder); +} else { +builders.put(UNNAMED_TOPOLOGY, builder); +} +} + +public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +this.config = config; +this.builders = builders; +if (builders.isEmpty()) { +log.debug("Building KafkaStreams app with no empty topology"); +} +} + +public int getNumStreamThreads(final StreamsConfig config) { +final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + +// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later +if (builders.isEmpty()) { +if (configuredNumStreamThreads != 0) { Review comment: Maybe we should just require `atLeast(1)` in StreamsConfig definition? And then here we only need to check the first condition. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -16,10 +16,294 @@ */ package org.apache.kafka.streams.processor.internals; +import
[GitHub] [kafka] vitojeng commented on pull request #10825: KAFKA-5876: Add `streams()` method to StateStoreProvider
vitojeng commented on pull request #10825: URL: https://github.com/apache/kafka/pull/10825#issuecomment-866794585 update PR & rebase trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r657032428 ## File path: clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java ## @@ -54,9 +55,11 @@ public static Builder forReplica(short allowedVersion, int replicaId) { return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED); } -public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) { +public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel, boolean requireMaxTimestamp) { Review comment: added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r657007472 ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -1444,43 +1450,63 @@ private DeleteGroupsResponse createDeleteGroupsResponse() { ); } -private ListOffsetsRequest createListOffsetRequest(int version) { +private ListOffsetsRequest createListOffsetRequest(int version, long timestamp) { if (version == 0) { ListOffsetsTopic topic = new ListOffsetsTopic() .setName("test") .setPartitions(Arrays.asList(new ListOffsetsPartition() .setPartitionIndex(0) -.setTimestamp(100L) +.setTimestamp(timestamp) .setMaxNumOffsets(10) .setCurrentLeaderEpoch(5))); return ListOffsetsRequest.Builder -.forConsumer(false, IsolationLevel.READ_UNCOMMITTED) +.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(Collections.singletonList(topic)) .build((short) version); } else if (version == 1) { ListOffsetsTopic topic = new ListOffsetsTopic() .setName("test") .setPartitions(Arrays.asList(new ListOffsetsPartition() .setPartitionIndex(0) -.setTimestamp(100L) +.setTimestamp(timestamp) .setCurrentLeaderEpoch(5))); return ListOffsetsRequest.Builder -.forConsumer(true, IsolationLevel.READ_UNCOMMITTED) +.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(Collections.singletonList(topic)) .build((short) version); -} else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) { +} else if (version >= 2 && version <= 6) { ListOffsetsPartition partition = new ListOffsetsPartition() .setPartitionIndex(0) -.setTimestamp(100L) +.setTimestamp(timestamp) .setCurrentLeaderEpoch(5); ListOffsetsTopic topic = new ListOffsetsTopic() .setName("test") .setPartitions(Arrays.asList(partition)); return ListOffsetsRequest.Builder -.forConsumer(true, IsolationLevel.READ_COMMITTED) +.forConsumer(true, IsolationLevel.READ_COMMITTED, false) .setTargetTimes(Collections.singletonList(topic)) .build((short) version); +} else if (version >= 7 && version <= LIST_OFFSETS.latestVersion()) { +ListOffsetsPartition partition = new ListOffsetsPartition() +.setPartitionIndex(0) +.setTimestamp(timestamp) +.setCurrentLeaderEpoch(5); + +ListOffsetsTopic topic = new ListOffsetsTopic() +.setName("test") +.setPartitions(Arrays.asList(partition)); +if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) { +return ListOffsetsRequest.Builder +.forConsumer(true, IsolationLevel.READ_COMMITTED, false) +.setTargetTimes(Collections.singletonList(topic)) +.build((short) version); +} else { Review comment: removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r657007220 ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -302,10 +302,16 @@ public void testSerialization() throws Exception { checkErrorResponse(createDeleteGroupsRequest(), unknownServerException, true); checkResponse(createDeleteGroupsResponse(), 0, true); for (short version : LIST_OFFSETS.allVersions()) { -checkRequest(createListOffsetRequest(version), true); -checkErrorResponse(createListOffsetRequest(version), unknownServerException, true); +checkRequest(createListOffsetRequest(version, 100L), true); +checkErrorResponse(createListOffsetRequest(version, 100L), unknownServerException, true); checkResponse(createListOffsetResponse(version), version, true); } +LIST_OFFSETS.allVersions().stream().filter(version -> version >= (short) 7).forEach( +version -> { +checkRequest(createListOffsetRequest(version, ListOffsetsRequest.MAX_TIMESTAMP), true); +checkErrorResponse(createListOffsetRequest(version, ListOffsetsRequest.MAX_TIMESTAMP), unknownServerException, true); +} +); Review comment: coolio, have removed it. ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -432,8 +438,8 @@ public void testSerialization() throws Exception { checkRequest(createUpdateMetadataRequest(5, null), false); checkErrorResponse(createUpdateMetadataRequest(5, "rack1"), unknownServerException, true); checkResponse(createUpdateMetadataResponse(), 0, true); -checkRequest(createListOffsetRequest(0), true); -checkErrorResponse(createListOffsetRequest(0), unknownServerException, true); +checkRequest(createListOffsetRequest(0, 100L), true); +checkErrorResponse(createListOffsetRequest(0, 100L), unknownServerException, true); Review comment: yep, removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r657007003 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4298,6 +4296,39 @@ void handleFailure(Throwable throwable) { } } } + +@Override +boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { +if (supportsMaxTimestamp) { +supportsMaxTimestamp = false; + +// fail any unsupported futures and remove partitions from the downgraded retry +List topicsToRemove = new ArrayList<>(); +partitionsToQuery.stream().forEach( +t -> { +List partitionsToRemove = new ArrayList<>(); +t.partitions().stream() +.filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) +.forEach( +p -> { +futures.get(new TopicPartition(t.name(), p.partitionIndex())) +.completeExceptionally( +new UnsupportedVersionException( +"Broker " + brokerId ++ " does not support MAX_TIMESTAMP offset spec")); +partitionsToRemove.add(p); + +}); +t.partitions().removeAll(partitionsToRemove); +if (t.partitions().isEmpty()) topicsToRemove.add(t); +} +); +partitionsToQuery.removeAll(topicsToRemove); + +return !partitionsToQuery.isEmpty(); Review comment: good point, I've added a check for this. ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception { } } +@Test +public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { + +Node node = new Node(0, "localhost", 8120); +List nodes = Collections.singletonList(node); +final Cluster cluster = new Cluster( +"mockClusterId", +nodes, +Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), +Collections.emptySet(), +Collections.emptySet(), +node); +final TopicPartition tp0 = new TopicPartition("foo", 0); + +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + +// listoffsets response from broker 0 +env.kafkaClient().prepareUnsupportedVersionResponse( +request -> request instanceof ListOffsetsRequest); + +ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp())); + +TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class); +} +} + +@Test +public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception { + +Node node = new Node(0, "localhost", 8120); +List nodes = Collections.singletonList(node); +List pInfos = new ArrayList<>(); +pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); +pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})); +final Cluster cluster = new Cluster( +"mockClusterId", +nodes, +pInfos, +Collections.emptySet(), +Collections.emptySet(), +node); +final TopicPartition tp0 = new TopicPartition("foo", 0); +final TopicPartition tp1 = new TopicPartition("foo", 1); + +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, +AdminClientConfig.RETRIES_CONFIG, "2")) { + +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + +// listoffsets response from broker 0 +env.kafkaClient().prepareUnsupportedVersionResponse( +request -> request
[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r657004623 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4298,6 +4296,39 @@ void handleFailure(Throwable throwable) { } } } + +@Override +boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { +if (supportsMaxTimestamp) { +supportsMaxTimestamp = false; + +// fail any unsupported futures and remove partitions from the downgraded retry +List topicsToRemove = new ArrayList<>(); +partitionsToQuery.stream().forEach( +t -> { +List partitionsToRemove = new ArrayList<>(); +t.partitions().stream() +.filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) +.forEach( +p -> { +futures.get(new TopicPartition(t.name(), p.partitionIndex())) +.completeExceptionally( +new UnsupportedVersionException( +"Broker " + brokerId ++ " does not support MAX_TIMESTAMP offset spec")); +partitionsToRemove.add(p); + +}); +t.partitions().removeAll(partitionsToRemove); +if (t.partitions().isEmpty()) topicsToRemove.add(t); +} +); +partitionsToQuery.removeAll(topicsToRemove); Review comment: agreed, looks much neater. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r656955014 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception { } } +@Test +public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { + +Node node = new Node(0, "localhost", 8120); +List nodes = Collections.singletonList(node); +final Cluster cluster = new Cluster( +"mockClusterId", +nodes, +Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), +Collections.emptySet(), +Collections.emptySet(), +node); +final TopicPartition tp0 = new TopicPartition("foo", 0); + +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + +// listoffsets response from broker 0 +env.kafkaClient().prepareUnsupportedVersionResponse( +request -> request instanceof ListOffsetsRequest); + +ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp())); + +TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class); +} +} + +@Test +public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception { + +Node node = new Node(0, "localhost", 8120); +List nodes = Collections.singletonList(node); +List pInfos = new ArrayList<>(); +pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); +pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})); +final Cluster cluster = new Cluster( +"mockClusterId", +nodes, +pInfos, +Collections.emptySet(), +Collections.emptySet(), +node); +final TopicPartition tp0 = new TopicPartition("foo", 0); +final TopicPartition tp1 = new TopicPartition("foo", 1); + +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, +AdminClientConfig.RETRIES_CONFIG, "2")) { + +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + +// listoffsets response from broker 0 +env.kafkaClient().prepareUnsupportedVersionResponse( +request -> request instanceof ListOffsetsRequest); + +ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543); +ListOffsetsResponseData responseData = new ListOffsetsResponseData() +.setThrottleTimeMs(0) +.setTopics(Arrays.asList(topicResponse)); +env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node); + +ListOffsetsResult result = env.adminClient().listOffsets(new HashMap() {{ +put(tp0, OffsetSpec.maxTimestamp()); +put(tp1, OffsetSpec.latest()); +}}); + +TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class); + +ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get(); +assertEquals(345L, tp1Offset.offset()); +assertEquals(543, tp1Offset.leaderEpoch().get().intValue()); +assertEquals(-1L, tp1Offset.timestamp()); +} +} + +@Test +public void testListOffsetsMaxTimestampAndNoBrokerResponse() { +Node node = new Node(0, "localhost", 8120); +List nodes = Collections.singletonList(node); +List pInfos = new ArrayList<>(); +pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); +pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})); +final Cluster cluster = new Cluster( +"mockClusterId", +nodes, +pInfos, +Collections.emptySet(), +Collections.emptySet(), +node); +final TopicPartition tp0 = new TopicPartition("foo", 0); +final TopicPartition tp1 = new TopicPartition("foo", 1); + +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, +AdminClientConfig.RETRIES_CONFIG, "2")) { + +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +
[GitHub] [kafka] cadonna commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API
cadonna commented on a change in pull request #10840: URL: https://github.com/apache/kafka/pull/10840#discussion_r656890122 ## File path: streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; + + +/** + * Represents the state of a single task running within a {@link KafkaStreams} application. + */ +public interface TaskMetadata { + +/** + * @return the basic task metadata such as subtopology and partition id + */ +TaskId taskId(); + +/** + * This function will return a set of the current TopicPartitions + */ +Set topicPartitions(); Review comment: Could you please use javadoc mark-up like `@return` and `@param` for the docs? Here and for the other methods. ## File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.util.Set; + +/** + * Represents the state of a single thread running within a {@link KafkaStreams} application. + */ +public interface ThreadMetadata { + + +/** + * @return the state of the Thread + */ +String threadState(); + +/** + * @return the name of the Thread + */ +String threadName(); + +/** + * This function will return the set of the {@link TaskMetadata} for the current active tasks + */ +Set activeTasks(); Review comment: Could you use javadoc mark-up for the docs? Here and for the other methods. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.TaskMetadata; +import org.apache.kafka.streams.ThreadMetadata; + +import java.util.Collections; +import java.util.Objects; +import java.util.Set; + +/** + * Represents the state of a single thread running within a {@link KafkaStreams} application. + */ +public class ThreadMetadataImpl implements ThreadMetadata { + +private final String threadName; + +private final String threadState; + +private final Set activeTasks; + +private final Set standbyTasks; + +private final String mainConsumerClientId; + +private final String
[GitHub] [kafka] PhilHardwick opened a new pull request #10921: MINOR: Ensure queryable store providers is up to date after adding stream thread
PhilHardwick opened a new pull request #10921: URL: https://github.com/apache/kafka/pull/10921 When a new thread is added the queryable store providers continues to use the store providers it was given when KafkaStreams was instantiated. I wanted to keep QueryableStoreProviders immutable, so this meant I had to make the queryableStoreProvider field in KafkaStreams class mutable to allow this change. This is tested via an integration test where, after adding a thread, producing messages with different keys shows that, with the previous code, the keys are not in the store and after the change they are queryable. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata
[ https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367967#comment-17367967 ] Luke Chen commented on KAFKA-12984: --- Good root cause analysis! And I agree the solution (a) that the sticky assignment algorithm to resolve cases of improper input conditions by invalidating the "ownedPartitions" in cases of double ownership. > Cooperative sticky assignor can get stuck with invalid SubscriptionState > input metadata > --- > > Key: KAFKA-12984 > URL: https://issues.apache.org/jira/browse/KAFKA-12984 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 3.0.0, 2.8.1 > > > Some users have reported seeing their consumer group become stuck in the > CompletingRebalance phase when using the cooperative-sticky assignor. Based > on the request metadata we were able to deduce that multiple consumers were > reporting the same partition(s) in their "ownedPartitions" field of the > consumer protocol. Since this is an invalid state, the input causes the > cooperative-sticky assignor to detect that something is wrong and throw an > IllegalStateException. If the consumer application is set up to simply retry, > this will cause the group to appear to hang in the rebalance state. > The "ownedPartitions" field is encoded based on the ConsumerCoordinator's > SubscriptionState, which was assumed to always be up to date. However there > may be cases where the consumer has dropped out of the group but fails to > clear the SubscriptionState, allowing it to report some partitions as owned > that have since been reassigned to another member. > We should (a) fix the sticky assignment algorithm to resolve cases of > improper input conditions by invalidating the "ownedPartitions" in cases of > double ownership, and (b) shore up the ConsumerCoordinator logic to better > handle rejoining the group and keeping its internal state consistent. See > KAFKA-12983 for more details on (b) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata
[ https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367967#comment-17367967 ] Luke Chen edited comment on KAFKA-12984 at 6/23/21, 8:42 AM: - Good root cause analysis! And I agree the solution (a) that the sticky assignment algorithm should resolve cases of improper input conditions by invalidating the "ownedPartitions" in cases of double ownership. was (Author: showuon): Good root cause analysis! And I agree the solution (a) that the sticky assignment algorithm to resolve cases of improper input conditions by invalidating the "ownedPartitions" in cases of double ownership. > Cooperative sticky assignor can get stuck with invalid SubscriptionState > input metadata > --- > > Key: KAFKA-12984 > URL: https://issues.apache.org/jira/browse/KAFKA-12984 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 3.0.0, 2.8.1 > > > Some users have reported seeing their consumer group become stuck in the > CompletingRebalance phase when using the cooperative-sticky assignor. Based > on the request metadata we were able to deduce that multiple consumers were > reporting the same partition(s) in their "ownedPartitions" field of the > consumer protocol. Since this is an invalid state, the input causes the > cooperative-sticky assignor to detect that something is wrong and throw an > IllegalStateException. If the consumer application is set up to simply retry, > this will cause the group to appear to hang in the rebalance state. > The "ownedPartitions" field is encoded based on the ConsumerCoordinator's > SubscriptionState, which was assumed to always be up to date. However there > may be cases where the consumer has dropped out of the group but fails to > clear the SubscriptionState, allowing it to report some partitions as owned > that have since been reassigned to another member. > We should (a) fix the sticky assignment algorithm to resolve cases of > improper input conditions by invalidating the "ownedPartitions" in cases of > double ownership, and (b) shore up the ConsumerCoordinator logic to better > handle rejoining the group and keeping its internal state consistent. See > KAFKA-12983 for more details on (b) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on pull request #10897: MINOR: Reduced severity for "skipping records" falling out of time windows
cadonna commented on pull request #10897: URL: https://github.com/apache/kafka/pull/10897#issuecomment-866637956 FYI: I opened PR #10920 to improve the unit tests for the log messages for the other types of dropped records. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10920: MINOR: Improve test of log messages for dropped records
cadonna commented on pull request #10920: URL: https://github.com/apache/kafka/pull/10920#issuecomment-866636831 Call for review: @xdgrulez @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request #10920: MINOR: Improve test of log messages for dropped records
cadonna opened a new pull request #10920: URL: https://github.com/apache/kafka/pull/10920 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr opened a new pull request #10919: KAFKA-12985: CVE-2021-28169 - Upgrade jetty to 9.4.41
dongjinleekr opened a new pull request #10919: URL: https://github.com/apache/kafka/pull/10919 [Jetty 9.4.41.v20210516](https://github.com/eclipse/jetty.project/releases/tag/jetty-9.4.41.v20210516) resolves following security vulnerabilities. - [CVE-2021-28169](https://nvd.nist.gov/vuln/detail/CVE-2021-28169) (described in the issue) - [CVE-2021-34428](https://nvd.nist.gov/vuln/detail/CVE-2021-34428) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12985) CVE-2021-28169 - Upgrade jetty to 9.4.41
Dongjin Lee created KAFKA-12985: --- Summary: CVE-2021-28169 - Upgrade jetty to 9.4.41 Key: KAFKA-12985 URL: https://issues.apache.org/jira/browse/KAFKA-12985 Project: Kafka Issue Type: Task Components: security Reporter: Dongjin Lee Assignee: Dongjin Lee CVE-2021-28169 vulnerability affects Jetty versions up to 9.4.40. For more information see https://nvd.nist.gov/vuln/detail/CVE-2021-28169 Upgrading to Jetty version 9.4.41 should address this issue (https://github.com/eclipse/jetty.project/security/advisories/GHSA-gwcr-j4wh-j3cq). -- This message was sent by Atlassian Jira (v8.3.4#803005)