[GitHub] [kafka] fvaleri commented on pull request #13136: KAFKA-14582: Move JmxTool to tools
fvaleri commented on PR #13136: URL: https://github.com/apache/kafka/pull/13136#issuecomment-1398707514 Output example: ```sh $ bin/kafka-run-class.sh org.apache.kafka.tools.JmxCommand --jmx-url service:jmx:rmi:///jndi/rmi://:/jmxrmi --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec --attributes FifteenMinuteRate,FiveMinuteRate --date-format "MMdd-hh:mm:ss" --reporting-interval 1000 --report-format tsv Trying to connect to JMX url: service:jmx:rmi:///jndi/rmi://:/jmxrmi time20230120-06:23:14 kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate 0.0 kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate 0.0 time 20230120-06:23:15 kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate 0.0 kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate 0.0 time 20230120-06:23:16 kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate 0.0 kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate 0.0 ^C ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on pull request #13060: KAFKA-14559: Fix JMX tool to handle the object names with wild cards and optional attributes
kamalcph commented on PR #13060: URL: https://github.com/apache/kafka/pull/13060#issuecomment-1398673057 @ijuma @showuon Please take a look when you get chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on pull request #13059: MINOR: KafkaConfig should not expose internal config when queried for non-internal values
kamalcph commented on PR #13059: URL: https://github.com/apache/kafka/pull/13059#issuecomment-1398670571 @ijuma @showuon can you please merge the patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri opened a new pull request, #13136: KAFKA-14582: Move JmxTool to tools
fvaleri opened a new pull request, #13136: URL: https://github.com/apache/kafka/pull/13136 This PR is based on https://github.com/apache/kafka/pull/13131. This class is also used by the system tests, so I need to check if the replacement works fine there too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
fvaleri commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1082712140 ## server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Helper functions for dealing with command line utilities. + */ +public class CommandLineUtils { +/** + * Check if there are no options or `--help` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt); +} + +/** + * Check if there is `--version` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.options.has(commandOpts.versionOpt); +} + +/** + * Check and print help message if there is no options or `--help` option + * from command line, if `--version` is specified on the command line + * print version information and exit. + * NOTE: The function name is not strictly speaking correct anymore + * as it also checks whether the version needs to be printed, but + * refactoring this would have meant changing all command line tools + * and unnecessarily increased the blast radius of this change. + * + * @param commandOpts Acceptable options for a command + * @param message Message to display on successful check + */ +public static void printHelpAndExitIfNeeded(CommandDefaultOptions commandOpts, String message) { +if (isPrintHelpNeeded(commandOpts)) { +printUsageAndDie(commandOpts.parser, message); +} +if (isPrintVersionNeeded(commandOpts)) { +printVersionAndDie(); +} +} + +/** + * Check that all the listed options are present. + */ +public static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec... requiredList) { +for (OptionSpec arg : requiredList) { +if (!options.has(arg)) { +printUsageAndDie(parser, String.format("Missing required argument \"%s\"", arg)); +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +OptionSpec... invalidOptions) { +if (options.has(usedOption)) { +for (OptionSpec arg : invalidOptions) { +if (options.has(arg)) { +printUsageAndDie(parser, String.format("Option \"%s\" can't be used with option \"%s\"", usedOption, arg)); +} +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +Set> invalidOptions) { +OptionSpec[] array = new OptionSpec[invalidOptions.size()]; +invalidOptions.toArray(array); +checkInvalidArgs(parser, options, usedOption, array); +} + +/** + * Check that none of the listed options are present with the combination of used options. + */ +public static void
[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
clolov commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1082663006 ## server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Helper functions for dealing with command line utilities. + */ +public class CommandLineUtils { +/** + * Check if there are no options or `--help` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt); +} + +/** + * Check if there is `--version` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.options.has(commandOpts.versionOpt); +} + +/** + * Check and print help message if there is no options or `--help` option + * from command line, if `--version` is specified on the command line + * print version information and exit. + * NOTE: The function name is not strictly speaking correct anymore + * as it also checks whether the version needs to be printed, but + * refactoring this would have meant changing all command line tools + * and unnecessarily increased the blast radius of this change. + * + * @param commandOpts Acceptable options for a command + * @param message Message to display on successful check + */ +public static void printHelpAndExitIfNeeded(CommandDefaultOptions commandOpts, String message) { +if (isPrintHelpNeeded(commandOpts)) { +printUsageAndDie(commandOpts.parser, message); +} +if (isPrintVersionNeeded(commandOpts)) { +printVersionAndDie(); +} +} + +/** + * Check that all the listed options are present. + */ +public static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec... requiredList) { +for (OptionSpec arg : requiredList) { +if (!options.has(arg)) { +printUsageAndDie(parser, String.format("Missing required argument \"%s\"", arg)); +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +OptionSpec... invalidOptions) { +if (options.has(usedOption)) { +for (OptionSpec arg : invalidOptions) { +if (options.has(arg)) { +printUsageAndDie(parser, String.format("Option \"%s\" can't be used with option \"%s\"", usedOption, arg)); +} +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +Set> invalidOptions) { +OptionSpec[] array = new OptionSpec[invalidOptions.size()]; +invalidOptions.toArray(array); +checkInvalidArgs(parser, options, usedOption, array); +} + +/** + * Check that none of the listed options are present with the combination of used options. + */ +public static void c
[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
ijuma commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1082623877 ## clients/src/main/java/org/apache/kafka/common/record/CompressionType.java: ## @@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier); } + +@Override +public int getRecommendedDOutSize() { +return 16 * 1024; // 16KB Review Comment: OK, that's fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
ijuma commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1082622801 ## clients/src/main/java/org/apache/kafka/common/record/CompressionType.java: ## @@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier); } + +@Override +public int getRecommendedDOutSize() { +return 16 * 1024; // 16KB Review Comment: The PR says: > we pushed the skipping of key/value logic to zstd-jni implementation instead of using the one provided by BufferedInputStream -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1082615267 ## clients/src/main/java/org/apache/kafka/common/record/CompressionType.java: ## @@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier); } + +@Override +public int getRecommendedDOutSize() { +return 16 * 1024; // 16KB Review Comment: This will be picked up as a separate follow-up PR: https://issues.apache.org/jira/browse/KAFKA-14634 I decided to keep it separate is to introduce fewer changes at a time and measure their impact. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
ijuma commented on PR #13135: URL: https://github.com/apache/kafka/pull/13135#issuecomment-1398459548 One more thing: when it comes to the testing, can we include the case where the batches have a single 10 byte message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
ijuma commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1082588469 ## clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java: ## @@ -62,10 +68,11 @@ public void release(ByteBuffer buffer) { } }; -// Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance -// in cases where the caller reads a small number of bytes (potentially a single byte). -return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer), -bufferPool), 16 * 1024); +// We do not use an intermediate buffer to store the decompressed data as a result of JNI read() calls using +// `ZstdInputStreamNoFinalizer` here. Every read() call to `DataInputStream` will be a JNI call and the +// caller is expected to balance the tradeoff between reading large amount of data vs. making multiple JNI +// calls. +return new DataInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer), bufferPool)); Review Comment: 2 questions: 1. Why do we wrap into DataInputStream? 2. Have as checked that there are no workloads where we end up doing too many JNI calls? ## clients/src/main/java/org/apache/kafka/common/record/CompressionType.java: ## @@ -47,6 +47,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { return new ByteBufferInputStream(buffer); } + +@Override +public int getRecommendedDOutSize() { +return 2 * 1024; // 2KB Review Comment: What's the meaning of this for an uncompressed stream? ## clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java: ## @@ -26,21 +26,25 @@ import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.DataInputStream; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; public class ZstdFactory { +/** + * Default compression level + */ +private static final int DEFAULT_COMPRESSION_LEVEL = 3; Review Comment: Since this is unrelated, do we have to include it as part of this PR? ## clients/src/main/java/org/apache/kafka/common/record/CompressionType.java: ## @@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier); } + +@Override +public int getRecommendedDOutSize() { +return 16 * 1024; // 16KB Review Comment: We decided not to get this info from the zstd library? ## clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java: ## @@ -273,20 +272,32 @@ public int partitionLeaderEpoch() { public DataInputStream recordInputStream(BufferSupplier bufferSupplier) { final ByteBuffer buffer = this.buffer.duplicate(); buffer.position(RECORDS_OFFSET); -return new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier)); +final InputStream decompressedStream = compressionType().wrapForInput(buffer, magic(), bufferSupplier); +return decompressedStream instanceof DataInputStream ? (DataInputStream) decompressedStream : new DataInputStream(decompressedStream); } private CloseableIterator compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) { final DataInputStream inputStream = recordInputStream(bufferSupplier); if (skipKeyValue) { // this buffer is used to skip length delimited fields like key, value, headers -byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE]; +final ByteBuffer skipBuffer = bufferSupplier.get(compressionType().getRecommendedDOutSize()); Review Comment: I thought we wanted to call the underlying skipBytes API versus doing the skipping by reading into a skip buffer. I don't see that change. What am I missing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on PR #13135: URL: https://github.com/apache/kafka/pull/13135#issuecomment-1398428117 @ijuma please review when you get a chance since you already have context about this code change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1082366873 ## clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java: ## @@ -26,21 +26,25 @@ import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.DataInputStream; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; public class ZstdFactory { +/** + * Default compression level + */ +private static final int DEFAULT_COMPRESSION_LEVEL = 3; Review Comment: FYI reviewer This change is a no-op since we are already using the default value of 3 when no value is provided. This change has been made to make it explicit in Kafka code that we are using compression level of 3 with zstd. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
clolov commented on PR #12821: URL: https://github.com/apache/kafka/pull/12821#issuecomment-1398172190 Thank you very much for the review and merge! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya opened a new pull request, #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya opened a new pull request, #13135: URL: https://github.com/apache/kafka/pull/13135 This covers two JIRAs https://issues.apache.org/jira/browse/KAFKA-14632 and https://issues.apache.org/jira/browse/KAFKA-14633 ## Background ![Screenshot 2023-01-19 at 18 27 45](https://user-images.githubusercontent.com/71267/213521204-bb3228ed-7d21-4e07-a520-697ea6fcc0ed.png) Currently, we use 2 intermediate buffers while handling decompressed data (one of size 2KB and another of size 16KB). These buffers are (de)allocated once per batch. The impact of this was observed in a flamegraph analysis for a compressed workload where we observed that 75% of CPU during `appendAsLeader()` is taken up by `ValidateMessagesAndAssignOffsets`. ![Screenshot 2023-01-20 at 10 41 08](https://user-images.githubusercontent.com/71267/213664252-389eaf3d-b8aa-465b-b010-db1024663d6f.png) ## Change With this PR: 1. we are removing the number of intermediate buffers from 2 to 1. This reduces 1 point of data copy. Note that this removed data copy occurred in chunks of 2kb at a time, multiple times. This is achieved by getting rid of `BufferedInputStream` and moving to `DataInputStream`. This change has only been made for `zstd` and `gzip`. 2. we are using thread local buffer pool for both the buffers involved in the process of decompression. This change impacts all compression types. 3. we pushed the skipping of key/value logic to After the change, the above buffer allocation looks as follows: ![Screenshot 2023-01-19 at 18 28 14](https://user-images.githubusercontent.com/71267/213525653-917ac5ee-810a-435e-bf84-c97d6b76005e.png) ## Results After this change, a JMH benchmark for `ValidateMessagesAndAssignOffsets` demonstrated 10-50% increased throughput across all compression types without any regression. The improvement is prominent when thread cached buffer pools are used with 1-2% regression in some limited scenarios. When buffer pools are not used (NO_CACHING in the results), we observed GZIP having 10% better performance in some cases with 1-4% regression for some other scenarios. Overall, without using the buffer pools, the upside of this code change is limited to single digit improvements in certain scenarios. Details results from JMH benchmark are available here: [benchmark-jira.xlsx](https://github.com/apache/kafka/files/10465049/benchmark-jira.xlsx) ## Testing - Sanity testing using the existing unit test to ensure that we don't impact correctness. - JMH benchmarks for all compression types to ensure that we did not regress other compression types. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request, #13134: MINOR: fix flaky StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription
ableegoldman opened a new pull request, #13134: URL: https://github.com/apache/kafka/pull/13134 This test is supposed to be a sanity check that rebalancing with a large number of partitions/consumers won't start to take obscenely long or approach the `max.poll.interval.ms` -- bumping up the timeout by another 30s still feels very reasonable considering the test is for 1 million partitions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hzh0425 opened a new pull request, #13133: MINOR: Fix some typos in remote.metadata.storage
hzh0425 opened a new pull request, #13133: URL: https://github.com/apache/kafka/pull/13133 Fix some typos in storage module / remote.metadata.storage ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
dajac merged PR #13112: URL: https://github.com/apache/kafka/pull/13112 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13099: KAFKA-14604: avoid SASL session expiration time overflowed when calculation
showuon commented on code in PR #13099: URL: https://github.com/apache/kafka/pull/13099#discussion_r1082162567 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1496,4 +1496,32 @@ public static String replaceSuffix(String str, String oldSuffix, String newSuffi throw new IllegalArgumentException("Expected string to end with " + oldSuffix + " but string is " + str); return str.substring(0, str.length() - oldSuffix.length()) + newSuffix; } + +public static long zeroIfNegative(long value) { +return Math.max(0L, value); +} + +// returns the sum of a and b unless it would overflow, which will return Long.MAX_VALUE +public static long saturatedAdd(long a, long b) { +long result = Long.MAX_VALUE; +try { +result = Math.addExact(a, b); +} catch (ArithmeticException e) { +log.info("The sum of {} and {} is overflowed, set to Long.MAX_VALUE", a, b); Review Comment: Hmm. OK. Let's throw an exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yufeiyan1220 commented on pull request #13125: KAFKA-14626: Kafka Consumer Coordinator does not cleanup all metrics after shutdown
yufeiyan1220 commented on PR #13125: URL: https://github.com/apache/kafka/pull/13125#issuecomment-1397979098 > Seems like we aren't particularly consistent at removing these metrics and sensors, fetcher would be another example. Mind making the clean up more comprehensive? Never mind. I think it should be better to make the metric removal logic consistent. Let me take some time to look for these cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13125: KAFKA-14626: Kafka Consumer Coordinator does not cleanup all metrics after shutdown
philipnee commented on PR #13125: URL: https://github.com/apache/kafka/pull/13125#issuecomment-1397952270 Seems like we aren't particularly consistent at removing these metrics and sensors, fetcher would be another example. Mind making the clean up more comprehensive? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request, #13132: MINOR: fix warnings in Streams javadocs
ableegoldman opened a new pull request, #13132: URL: https://github.com/apache/kafka/pull/13132 While working on the 3.4 release I noticed we've built up an embarrassingly long list of warnings within the Streams javadocs. It's unavoidable for some links to break as the source code changes, but let's reset back to a good state before the list gets even longer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets
mjsax merged PR #13129: URL: https://github.com/apache/kafka/pull/13129 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yufeiyan1220 commented on pull request #13125: KAFKA-14626: Kafka Consumer Coordinator does not cleanup all metrics after shutdown
yufeiyan1220 commented on PR #13125: URL: https://github.com/apache/kafka/pull/13125#issuecomment-1397837046 > Thanks for the PR and the issue @yufeiyan1220 - I wonder if the clean up is necessary, as the metrics will be closed upon the client closing. Willing to hear what others say. I found that problem when I tried to create and close consumer frequently. Just like the test case I have made in `KafkaConsumerTest.java`, these consumer coordinator metrics are still present if I does not close these metrics explicitly. I think most of users might not try to create and close consumer frequently, but there is a potential for memory leak actually. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Gerrrr commented on pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets
Ge commented on PR #13129: URL: https://github.com/apache/kafka/pull/13129#issuecomment-1397815859 @hachikuji @jolshan I don't have Kafka committer privileges. Can you please commit? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Gerrrr commented on a diff in pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets
Ge commented on code in PR #13129: URL: https://github.com/apache/kafka/pull/13129#discussion_r1081881848 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -318,7 +318,8 @@ public class ProducerConfig extends AbstractConfig { /** transaction.timeout.ms */ public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms"; -public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction." + +public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount of time in milliseconds that a transaction will remain open before the coordinator proactively aborts it. " + +"The start of the transaction is set at the time that the first partition is added to it. " + "If this value is larger than the transaction.max.timeout.ms setting in the broker, the request will fail with a InvalidTxnTimeoutException error."; Review Comment: Done in [a1119c1](https://github.com/apache/kafka/pull/13129/commits/a1119c16f5d8c3cedf554d62e8112085b2f6) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets
hachikuji commented on code in PR #13129: URL: https://github.com/apache/kafka/pull/13129#discussion_r1081878202 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -318,7 +318,8 @@ public class ProducerConfig extends AbstractConfig { /** transaction.timeout.ms */ public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms"; -public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction." + +public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount of time in milliseconds that a transaction will remain open before the coordinator proactively aborts it. " + +"The start of the transaction is set at the time that the first partition is added to it. " + "If this value is larger than the transaction.max.timeout.ms setting in the broker, the request will fail with a InvalidTxnTimeoutException error."; Review Comment: nit: could we put `` blocks around `transaction.max.timeout.ms` while we're in here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Gerrrr commented on a diff in pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets
Ge commented on code in PR #13129: URL: https://github.com/apache/kafka/pull/13129#discussion_r1081848092 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -319,6 +319,7 @@ public class ProducerConfig extends AbstractConfig { /** transaction.timeout.ms */ public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms"; public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction." + +"The transaction status update happens on the first producer send, on adding new partitions to the transaction, and on commit. " + Review Comment: Thank you for the review! Updated in [420991c](https://github.com/apache/kafka/pull/13129/commits/420991c39c40a508ec6b7e87c3755597a67441e9). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
dajac commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1081788435 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ## @@ -557,25 +557,25 @@ private[group] class GroupCoordinatorAdapter( } override def onElection( -partitionIndex: Int, -partitionLeaderEpoch: Int +consumerOffsetsPartitionIndex: Int, +consumerOffsetsPartitionLeaderEpoch: Int ): Unit = { -coordinator.onElection(partitionIndex, partitionLeaderEpoch) +coordinator.onElection(consumerOffsetsPartitionIndex, consumerOffsetsPartitionLeaderEpoch) } override def onResignation( -partitionIndex: Int, -partitionLeaderEpoch: OptionalInt +consumerOffsetsPartitionIndex: Int, +consumerOffsetsPartitionLeaderEpoch: OptionalInt ): Unit = { -coordinator.onResignation(partitionIndex, partitionLeaderEpoch) +coordinator.onResignation(consumerOffsetsPartitionIndex, consumerOffsetsPartitionLeaderEpoch) } - override def offsetsTopicConfigs(): Properties = { + override def consumerOffsetsTopicConfigs(): Properties = { coordinator.offsetsTopicConfigs } - override def startup(groupMetadataTopicPartitionCount: IntSupplier): Unit = { -coordinator.startup(() => groupMetadataTopicPartitionCount.getAsInt) + override def startup(consumerOffsetsPartitionCount: IntSupplier): Unit = { Review Comment: I don't think that we can use `groupMetadataTopic` as prefix everywhere because in onElection/onResignation we really want to expression the partition index. However, we could use `groupMetadata` as prefix in those cases. I updated the PR, let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1081688123 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ## @@ -557,25 +557,25 @@ private[group] class GroupCoordinatorAdapter( } override def onElection( -partitionIndex: Int, -partitionLeaderEpoch: Int +consumerOffsetsPartitionIndex: Int, +consumerOffsetsPartitionLeaderEpoch: Int ): Unit = { -coordinator.onElection(partitionIndex, partitionLeaderEpoch) +coordinator.onElection(consumerOffsetsPartitionIndex, consumerOffsetsPartitionLeaderEpoch) } override def onResignation( -partitionIndex: Int, -partitionLeaderEpoch: OptionalInt +consumerOffsetsPartitionIndex: Int, +consumerOffsetsPartitionLeaderEpoch: OptionalInt ): Unit = { -coordinator.onResignation(partitionIndex, partitionLeaderEpoch) +coordinator.onResignation(consumerOffsetsPartitionIndex, consumerOffsetsPartitionLeaderEpoch) } - override def offsetsTopicConfigs(): Properties = { + override def consumerOffsetsTopicConfigs(): Properties = { coordinator.offsetsTopicConfigs } - override def startup(groupMetadataTopicPartitionCount: IntSupplier): Unit = { -coordinator.startup(() => groupMetadataTopicPartitionCount.getAsInt) + override def startup(consumerOffsetsPartitionCount: IntSupplier): Unit = { Review Comment: sorry to go back and forth on this one, but I wonder if we call it "GROUP_METADATA_TOPIC_NAME" in Topic.java, if we should actually just call it groupMetadataTopic here (and above). It's still more specific than "partition" in the above cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1081688123 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ## @@ -557,25 +557,25 @@ private[group] class GroupCoordinatorAdapter( } override def onElection( -partitionIndex: Int, -partitionLeaderEpoch: Int +consumerOffsetsPartitionIndex: Int, +consumerOffsetsPartitionLeaderEpoch: Int ): Unit = { -coordinator.onElection(partitionIndex, partitionLeaderEpoch) +coordinator.onElection(consumerOffsetsPartitionIndex, consumerOffsetsPartitionLeaderEpoch) } override def onResignation( -partitionIndex: Int, -partitionLeaderEpoch: OptionalInt +consumerOffsetsPartitionIndex: Int, +consumerOffsetsPartitionLeaderEpoch: OptionalInt ): Unit = { -coordinator.onResignation(partitionIndex, partitionLeaderEpoch) +coordinator.onResignation(consumerOffsetsPartitionIndex, consumerOffsetsPartitionLeaderEpoch) } - override def offsetsTopicConfigs(): Properties = { + override def consumerOffsetsTopicConfigs(): Properties = { coordinator.offsetsTopicConfigs } - override def startup(groupMetadataTopicPartitionCount: IntSupplier): Unit = { -coordinator.startup(() => groupMetadataTopicPartitionCount.getAsInt) + override def startup(consumerOffsetsPartitionCount: IntSupplier): Unit = { Review Comment: sorry to go back and forth on this one, but I wonder if we call it "GROUP_METADATA_TOPIC_NAME" in topics, if we should actually just call it groupMetadataTopic here (and above). It's still more specific than "partition" in the above cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets
jolshan commented on code in PR #13129: URL: https://github.com/apache/kafka/pull/13129#discussion_r1081661935 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -319,6 +319,7 @@ public class ProducerConfig extends AbstractConfig { /** transaction.timeout.ms */ public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms"; public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction." + +"The transaction status update happens on the first producer send, on adding new partitions to the transaction, and on commit. " + Review Comment: Ah -- I think I also had this confusion since there are notions of expiration and timeout. But yes, looking at your links I see that timeout is affected by the start timestamp only, while expiration is affected by the other components mentioned in the current description (adding partitions etc). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
jeffkbkim commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1081656322 ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json: ## @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 68, + "type": "response", + "name": "ConsumerGroupHeartbeatResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED + // - NOT_COORDINATOR + // - COORDINATOR_NOT_AVAILABLE + // - COORDINATOR_LOAD_IN_PROGRESS + // - INVALID_REQUEST + // - UNKNOWN_MEMBER_ID + // - FENCED_MEMBER_EPOCH + // - UNSUPPORTED_ASSIGNOR + // - UNRELEASED_INSTANCE_ID + // - GROUP_MAX_SIZE_REACHED + "fields": [ +{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, +{ "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error" }, +{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." }, +{ "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member id generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." }, +{ "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The member epoch." }, +{ "name": "ShouldComputeAssignment", "type": "bool", "versions": "0+", + "about": "True if the member should compute the assignment for the group." }, +{ "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+", + "about": "The heartbeat interval in milliseconds." }, +{ "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "null if not provided; the assignment otherwise.", "fields": [ + { "name": "Error", "type": "int8", "versions": "0+", +"about": "The assigned error." }, + { "name": "AssignedTopicPartitions", "type": "[]TopicPartitions", "versions": "0+", +"about": "The partitions assigned to the member that can be used immediately." }, + { "name": "PendingTopicPartitions", "type": "[]TopicPartitions", "versions": "0+", +"about": "The partitions assigned to the member that cannot be used because they are not released by their former owners yet." }, + { "name": "MetadataVersion", "type": "int16", "versions": "0+", +"about": "The version of the metadata." }, + { "name": "MetadataBytes", "type": "bytes", "versions": "0+", +"about": "The assigned metadata." } +]} + ], + "commonStructs": [ Review Comment: aha - thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
jeffkbkim commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1081655338 ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json: ## @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 68, + "type": "response", + "name": "ConsumerGroupHeartbeatResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED + // - NOT_COORDINATOR + // - COORDINATOR_NOT_AVAILABLE + // - COORDINATOR_LOAD_IN_PROGRESS + // - INVALID_REQUEST + // - UNKNOWN_MEMBER_ID + // - FENCED_MEMBER_EPOCH + // - UNSUPPORTED_ASSIGNOR + // - UNRELEASED_INSTANCE_ID + // - GROUP_MAX_SIZE_REACHED Review Comment: yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets
hachikuji commented on code in PR #13129: URL: https://github.com/apache/kafka/pull/13129#discussion_r1081629522 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -319,6 +319,7 @@ public class ProducerConfig extends AbstractConfig { /** transaction.timeout.ms */ public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms"; public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction." + +"The transaction status update happens on the first producer send, on adding new partitions to the transaction, and on commit. " + Review Comment: I think this configuration is a bit misleading as currently documented. It looks to me like the implementation computes the timeout from the start of the transaction (i.e. the first call to `AddPartitionsToTxn`). We use `txnStartTimestamp` for this purpose here: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L134. This value is only updated on the initial transition to the `Ongoing` state: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala#L331. I'd suggest we rephrase this configuration to something like this: > The maximum amount of time in milliseconds that a transaction will remain open before the coordinator proactively aborts. The start of the transaction is set at the time that the first partition is added to it. If this value is larger than the transaction.max.timeout.ms setting in the broker, the request will fail with a InvalidTxnTimeoutException error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets
hachikuji commented on code in PR #13129: URL: https://github.com/apache/kafka/pull/13129#discussion_r1081629522 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -319,6 +319,7 @@ public class ProducerConfig extends AbstractConfig { /** transaction.timeout.ms */ public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms"; public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction." + +"The transaction status update happens on the first producer send, on adding new partitions to the transaction, and on commit. " + Review Comment: I think this configuration is a bit misleading as currently documented. It looks to me like the implementation computes the timeout from the start of the transaction (i.e. the first call to `AddPartitionsToTxn`). We use `txnStartTimestamp` for this purpose here: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L134. This value is only updated on the initial transition to the `Ongoing` state: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala#L331. I'd suggest we rephrase this configuration to something like this: > The maximum amount of time in milliseconds that a transaction will remain open before the coordinator proactively aborts. The start of the transaction is set at the time that the first partition is added to it. If this value is larger than the transaction.max.timeout.ms setting in the broker, the request will fail with a InvalidTxnTimeoutException error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
cadonna merged PR #12821: URL: https://github.com/apache/kafka/pull/12821 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
cadonna commented on PR #12821: URL: https://github.com/apache/kafka/pull/12821#issuecomment-1397370205 Build failures are unrelated: ``` Build / JDK 11 and Scala 2.13 / org.apache.kafka.clients.consumer.KafkaConsumerTest.testReturnRecordsDuringRebalance() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInSinkTaskStart Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testBrokerCoordinator Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13119: KAFKA-14623: OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging
kirktrue commented on code in PR #13119: URL: https://github.com/apache/kafka/pull/13119#discussion_r1081589480 ## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java: ## @@ -240,6 +240,9 @@ static String handleOutput(final HttpURLConnection con) throws IOException { int responseCode = con.getResponseCode(); log.debug("handleOutput - responseCode: {}", responseCode); +// NOTE: the contents of the response should not be logged so that we don't leak any +// sensitive data. +// TODO: is it OK to log the error response body and/or its formatted version? Review Comment: @smjn: Thanks for referencing the RFC and section. I updated the comments to include the link. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
dajac commented on PR #13112: URL: https://github.com/apache/kafka/pull/13112#issuecomment-1397338100 @jolshan Updated the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
dajac commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1081583343 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java: ## @@ -215,4 +223,86 @@ CompletableFuture deleteOffsets( OffsetDeleteRequestData request, BufferSupplier bufferSupplier ); + +/** + * Return the partition index for the given Group. + * + * @param groupId The group id. + * + * @return The partition index. + */ +int partitionFor(String groupId); + +/** + * Commit or abort the pending transactional offsets for the given partitions. + * + * @param producerIdThe producer id. + * @param partitionsThe partitions. + * @param transactionResult The result of the transaction. + */ +void onTransactionCompleted( +long producerId, +Iterable partitions, +TransactionResult transactionResult +); + +/** + * Delete the provided partitions' offsets. + * + * @param topicPartitions The deleted partitions. + * @param bufferSupplierThe buffer supplier tight to the request thread. + */ +void onPartitionsDeleted( +List topicPartitions, +BufferSupplier bufferSupplier +); + +/** + * Group coordinator is now the leader for the given partition at the + * given leader epoch. It should load cached state from the partition + * and begin handling requests for groups mapped to it. + * + * @param partitionIndexThe partition index. + * @param partitionLeaderEpoch The leader epoch of the partition. Review Comment: That's right. It is all about the `__consumer_offsets`. `coordinatorEpoch` is the leader epoch of the partition. I used leader epoch here because it is a bit clearer to me. Let me update those names to be consistent. Good point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13119: KAFKA-14623: OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging
kirktrue commented on code in PR #13119: URL: https://github.com/apache/kafka/pull/13119#discussion_r1081575409 ## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java: ## @@ -240,6 +240,9 @@ static String handleOutput(final HttpURLConnection con) throws IOException { int responseCode = con.getResponseCode(); log.debug("handleOutput - responseCode: {}", responseCode); +// NOTE: the contents of the response should not be logged so that we don't leak any +// sensitive data. +// TODO: is it OK to log the error response body and/or its formatted version? Review Comment: @smjn - No known instance that I'm aware of. A security review brought it to my attention, so addressing preemptively. The main fix is to remove the logging of the successful response, which would include outputting the access token to the logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13119: KAFKA-14623: OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging
kirktrue commented on code in PR #13119: URL: https://github.com/apache/kafka/pull/13119#discussion_r1081575409 ## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java: ## @@ -240,6 +240,9 @@ static String handleOutput(final HttpURLConnection con) throws IOException { int responseCode = con.getResponseCode(); log.debug("handleOutput - responseCode: {}", responseCode); +// NOTE: the contents of the response should not be logged so that we don't leak any +// sensitive data. +// TODO: is it OK to log the error response body and/or its formatted version? Review Comment: @smjn - No known instance that I'm aware of. A security review brought it to my attention, so addressing preemptively. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
dajac commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1081571516 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -3448,7 +3445,7 @@ class KafkaApisTest { val expectedJoinGroupResponse = new JoinGroupResponseData() .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code) .setMemberId("member") - .setProtocolName(if (version >= 7) null else GroupCoordinator.NoProtocol) + .setProtocolName(if (version >= 7) null else kafka.coordinator.group.GroupCoordinator.NoProtocol) Review Comment: Yes, we should but I wanted to do this separately because I don't know yet where to put them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
dajac commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1081571061 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ## @@ -511,4 +532,53 @@ class GroupCoordinatorAdapter( future } + + override def partitionFor(groupId: String): Int = { +coordinator.partitionFor(groupId) + } + + override def onTransactionCompleted( +producerId: Long, +partitions: java.lang.Iterable[TopicPartition], +transactionResult: TransactionResult + ): Unit = { +coordinator.scheduleHandleTxnCompletion( + producerId, + partitions.asScala, + transactionResult +) + } + + override def onPartitionsDeleted( +topicPartitions: util.List[TopicPartition], +bufferSupplier: BufferSupplier + ): Unit = { +coordinator.handleDeletedPartitions(topicPartitions.asScala, RequestLocal(bufferSupplier)) Review Comment: We cannot use RequestLocal in the interface because it is part of core. Therefore, we have to pass the underlying buffer supplier and build RequestLocal object here. Note that we did this for other apis that we have merged so far. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
fvaleri commented on PR #13131: URL: https://github.com/apache/kafka/pull/13131#issuecomment-1397311675 Take a look at `DumpLogSegments` to see how `CommandDefaultOptions` is used. It is much better than building the option list at the start of the `main/execute` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #8431: MINOR: Rename description of flatMapValues transformation
bbejeck commented on PR #8431: URL: https://github.com/apache/kafka/pull/8431#issuecomment-1397212359 cherry-picked to 3.4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #13130: Fix upgrade compatibility issue from older versions to 3.4
mumrah merged PR #13130: URL: https://github.com/apache/kafka/pull/13130 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #8431: MINOR: Rename description of flatMapValues transformation
bbejeck commented on PR #8431: URL: https://github.com/apache/kafka/pull/8431#issuecomment-1397198456 Hi @maseiler, can you do the same PR for the [3.3 version of the streams developer guide](https://github.com/apache/kafka-site/blob/asf-site/33/streams/developer-guide/dsl-api.html)? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #8431: MINOR: Rename description of flatMapValues transformation
bbejeck commented on PR #8431: URL: https://github.com/apache/kafka/pull/8431#issuecomment-1397167523 Thanks for the contribution @maseiler ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #8431: MINOR: Rename description of flatMapValues transformation
bbejeck commented on PR #8431: URL: https://github.com/apache/kafka/pull/8431#issuecomment-1397166959 Merged #8431 into trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #8431: MINOR: Rename description of flatMapValues transformation
bbejeck merged PR #8431: URL: https://github.com/apache/kafka/pull/8431 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] maseiler commented on pull request #8431: MINOR: Rename description of flatMapValues transformation
maseiler commented on PR #8431: URL: https://github.com/apache/kafka/pull/8431#issuecomment-1397133912 @mjsax, I solved the merge conflict and rebased it on the latest version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
clolov commented on code in PR #12821: URL: https://github.com/apache/kafka/pull/12821#discussion_r1081305288 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ## @@ -187,151 +137,107 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); -ctrl.verify(); +// The unlock logic should still be executed. +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { -expect(stateManager.taskId()).andReturn(taskId); - -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - -stateDirectory.unlock(taskId); - -ctrl.checkOrder(true); -ctrl.replay(); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); +doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); -ctrl.verify(); +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall(); - +final InOrder inOrder = inOrder(stateManager, stateDirectory); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); // The `baseDir` will be accessed when attempting to delete the state store. - expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); - -stateDirectory.unlock(taskId); -expectLastCall(); - -ctrl.checkOrder(true); -ctrl.replay(); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); -ctrl.verify(); +inOrder.verify(stateManager).close(); +inOrder.verify(stateDirectory).unlock(taskId); +verifyNoMoreInteractions(stateManager, stateDirectory); } @Test -public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException { +public void shouldStillWipeStateStoresIfCloseThrowsException() { final File randomFile = new File("/random/path"); -mockStatic(Utils.class); - -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall().andThrow(new ProcessorStateException("Close failed")); - -expect(stateManager.baseDir()).andReturn(randomFile); -Utils.delete(randomFile); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); +doThrow(new ProcessorStateException("Close failed")).when(stateManager).close(); +when(stateManager.baseDir()).thenReturn(randomFile); -stateDirectory.unlock(taskId); -expectLastCall(); +try (MockedStatic utils = mockStatic(Utils.class)) { +assertThrows(ProcessorStateException.class, () -> +StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); +} -ctrl.checkOrder(true); -ctrl.replay(); - -replayAll(); - -assertThrows(ProcessorStateException.class, () -> -StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - -ctrl.verify(); +verify(stateDirectory).unlock(taskId); } @Test -public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException { +public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() { final File unknownFile = new File("/unknown/path"); -mockStatic(Utils.class); - -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall(); - -expect(stateManager.baseDir()).andReturn(unknownFile); - -Utils.delete(unknownFile); -expectLastCall().andThrow(new IOException("Deletion failed")); +
[GitHub] [kafka] cadonna merged pull request #12818: KAFKA-14133: Replace EasyMock with Mockito in streams tests
cadonna merged PR #12818: URL: https://github.com/apache/kafka/pull/12818 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12818: KAFKA-14133: Replace EasyMock with Mockito in streams tests
cadonna commented on PR #12818: URL: https://github.com/apache/kafka/pull/12818#issuecomment-1397015212 Build failures are unrelated: ``` Build / JDK 11 and Scala 2.13 / kafka.controller.ControllerIntegrationTest.testPartitionReassignmentToBrokerWithOfflineLogDir() Build / JDK 8 and Scala 2.12 / org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testSnapshotSaveAndLoad() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools
fvaleri commented on code in PR #13127: URL: https://github.com/apache/kafka/pull/13127#discussion_r1081230571 ## build.gradle: ## @@ -1757,6 +1757,7 @@ project(':tools') { archivesBaseName = "kafka-tools" dependencies { +implementation project(':core') Review Comment: @cadonna yes, there is some confusion on which tools need to be migrated. For example, `DumpLogSegments` has multiple dependencies on `core` classes, so it should be excluded, but was added as a sub task. Checking with @mimaison how we want to precede with these. All the rest can be migrated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
fvaleri commented on PR #13131: URL: https://github.com/apache/kafka/pull/13131#issuecomment-1396923697 @clolov @vamossagar12 @tinaselenge -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
fvaleri commented on PR #13131: URL: https://github.com/apache/kafka/pull/13131#issuecomment-1396922639 @mimaison @ijuma -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri opened a new pull request, #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
fvaleri opened a new pull request, #13131: URL: https://github.com/apache/kafka/pull/13131 These classes are required by most commands, so they must be migrated first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
cadonna commented on code in PR #12821: URL: https://github.com/apache/kafka/pull/12821#discussion_r1081170724 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ## @@ -187,151 +137,107 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); -ctrl.verify(); +// The unlock logic should still be executed. +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { -expect(stateManager.taskId()).andReturn(taskId); - -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - -stateDirectory.unlock(taskId); - -ctrl.checkOrder(true); -ctrl.replay(); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); +doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); -ctrl.verify(); +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall(); - +final InOrder inOrder = inOrder(stateManager, stateDirectory); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); // The `baseDir` will be accessed when attempting to delete the state store. - expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); - -stateDirectory.unlock(taskId); -expectLastCall(); - -ctrl.checkOrder(true); -ctrl.replay(); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); -ctrl.verify(); +inOrder.verify(stateManager).close(); +inOrder.verify(stateDirectory).unlock(taskId); +verifyNoMoreInteractions(stateManager, stateDirectory); } @Test -public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException { +public void shouldStillWipeStateStoresIfCloseThrowsException() { final File randomFile = new File("/random/path"); -mockStatic(Utils.class); - -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall().andThrow(new ProcessorStateException("Close failed")); - -expect(stateManager.baseDir()).andReturn(randomFile); -Utils.delete(randomFile); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); +doThrow(new ProcessorStateException("Close failed")).when(stateManager).close(); +when(stateManager.baseDir()).thenReturn(randomFile); -stateDirectory.unlock(taskId); -expectLastCall(); +try (MockedStatic utils = mockStatic(Utils.class)) { +assertThrows(ProcessorStateException.class, () -> +StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); +} -ctrl.checkOrder(true); -ctrl.replay(); - -replayAll(); - -assertThrows(ProcessorStateException.class, () -> -StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - -ctrl.verify(); +verify(stateDirectory).unlock(taskId); } @Test -public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException { +public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() { final File unknownFile = new File("/unknown/path"); -mockStatic(Utils.class); - -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall(); - -expect(stateManager.baseDir()).andReturn(unknownFile); - -Utils.delete(unknownFile); -expectLastCall().andThrow(new IOException("Deletion failed")); +
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1081145849 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -712,8 +733,16 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo } private void sendPrivileged(String key, byte[] value) { +sendPrivileged(key, value, null); +} + +private void sendPrivileged(String key, byte[] value, Callback callback) { if (!usesFencableWriter) { -configLog.send(key, value); Review Comment: I've gone ahead and made the changes to convert the `KafkaConfigBackingStore` APIs to be synchronous even when EOS is disabled (thus making the behavior more in-line with the EOS case). This simplifies things significantly while also providing consistency w.r.t error handling across all the `KafkaConfigBackingStore` APIs without making invasive changes. ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) { try { fencableProducer.beginTransaction(); -fencableProducer.send(new ProducerRecord<>(topic, key, value)); +fencableProducer.send(new ProducerRecord<>(topic, key, value), (metadata, exception) -> { Review Comment: I've removed the usage of the producer callback here since we're moving to synchronous usage of producer send in the non-EOS case as well anyway (aside from the earlier point that it doesn't really make sense to handle errors via both a callback as well as `commitTransaction`). The behavior of surfacing exceptions synchronously is similar in both cases now; one through calling `get()` on the returned future from `Producer::send` and the other through `Producer::commitTransaction`. > Another option for the above issue could be changing the exception mapper to concatenate all the exception messages from the exception chain. Yet another option for this could be to simply append a "Check the worker logs for more details on the error" to the top level exception's message in the REST API response (the worker logs will have the entire exception chain). Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
clolov commented on code in PR #12821: URL: https://github.com/apache/kafka/pull/12821#discussion_r1081139070 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ## @@ -187,151 +137,107 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); -ctrl.verify(); +// The unlock logic should still be executed. +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { -expect(stateManager.taskId()).andReturn(taskId); - -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - -stateDirectory.unlock(taskId); - -ctrl.checkOrder(true); -ctrl.replay(); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); +doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); -ctrl.verify(); +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall(); - +final InOrder inOrder = inOrder(stateManager, stateDirectory); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); // The `baseDir` will be accessed when attempting to delete the state store. - expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); - -stateDirectory.unlock(taskId); -expectLastCall(); - -ctrl.checkOrder(true); -ctrl.replay(); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); -ctrl.verify(); +inOrder.verify(stateManager).close(); +inOrder.verify(stateDirectory).unlock(taskId); +verifyNoMoreInteractions(stateManager, stateDirectory); } @Test -public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException { +public void shouldStillWipeStateStoresIfCloseThrowsException() { final File randomFile = new File("/random/path"); -mockStatic(Utils.class); - -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall().andThrow(new ProcessorStateException("Close failed")); - -expect(stateManager.baseDir()).andReturn(randomFile); -Utils.delete(randomFile); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); +doThrow(new ProcessorStateException("Close failed")).when(stateManager).close(); +when(stateManager.baseDir()).thenReturn(randomFile); -stateDirectory.unlock(taskId); -expectLastCall(); +try (MockedStatic utils = mockStatic(Utils.class)) { +assertThrows(ProcessorStateException.class, () -> +StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); +} -ctrl.checkOrder(true); -ctrl.replay(); - -replayAll(); - -assertThrows(ProcessorStateException.class, () -> -StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - -ctrl.verify(); +verify(stateDirectory).unlock(taskId); } @Test -public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException { +public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() { final File unknownFile = new File("/unknown/path"); -mockStatic(Utils.class); - -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall(); - -expect(stateManager.baseDir()).andReturn(unknownFile); - -Utils.delete(unknownFile); -expectLastCall().andThrow(new IOException("Deletion failed")); +
[GitHub] [kafka] fvaleri commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools
fvaleri commented on code in PR #13127: URL: https://github.com/apache/kafka/pull/13127#discussion_r1081057443 ## build.gradle: ## @@ -1757,6 +1757,7 @@ project(':tools') { archivesBaseName = "kafka-tools" dependencies { +implementation project(':core') Review Comment: Yes I'm working on https://issues.apache.org/jira/browse/KAFKA-14628 and really close to opening the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools
vamossagar12 commented on code in PR #13127: URL: https://github.com/apache/kafka/pull/13127#discussion_r1081021244 ## build.gradle: ## @@ -1757,6 +1757,7 @@ project(':tools') { archivesBaseName = "kafka-tools" dependencies { +implementation project(':core') Review Comment: >I think there they talk about the dependencies of tools' test module not of the tools per se. Oh Ok. Got it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools
cadonna commented on code in PR #13127: URL: https://github.com/apache/kafka/pull/13127#discussion_r1081015656 ## build.gradle: ## @@ -1757,6 +1757,7 @@ project(':tools') { archivesBaseName = "kafka-tools" dependencies { +implementation project(':core') Review Comment: Ah, great! Thanks for sharing! Good to know that people is aware! > Also, not sure if this comment from Ismael has any relevance here: https://github.com/apache/kafka/pull/13095#issuecomment-1376021193? I think there they talk about the dependencies of tools' test module not of the tools per se. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools
vamossagar12 commented on code in PR #13127: URL: https://github.com/apache/kafka/pull/13127#discussion_r1081010418 ## build.gradle: ## @@ -1757,6 +1757,7 @@ project(':tools') { archivesBaseName = "kafka-tools" dependencies { +implementation project(':core') Review Comment: Also, not sure if this comment from Ismael has any relevance here: https://github.com/apache/kafka/pull/13095#issuecomment-1376021193? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools
vamossagar12 commented on code in PR #13127: URL: https://github.com/apache/kafka/pull/13127#discussion_r1081009344 ## build.gradle: ## @@ -1757,6 +1757,7 @@ project(':tools') { archivesBaseName = "kafka-tools" dependencies { +implementation project(':core') Review Comment: Thanks @clolov i was about to type the same thing :D So this PR and I guess your PR is blocked until https://github.com/apache/kafka/pull/13122 is merged? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools
clolov commented on code in PR #13127: URL: https://github.com/apache/kafka/pull/13127#discussion_r1081007952 ## build.gradle: ## @@ -1757,6 +1757,7 @@ project(':tools') { archivesBaseName = "kafka-tools" dependencies { +implementation project(':core') Review Comment: Hello, hello, chipping in to say this is the same conversation we had with @fvaleri in https://github.com/apache/kafka/pull/13122. There should be a separate PR soon containing changes for CommandLineUtils (et al.) to unblock all JIRA tickets depending on it 😊 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools
cadonna commented on code in PR #13127: URL: https://github.com/apache/kafka/pull/13127#discussion_r1081000404 ## build.gradle: ## @@ -1757,6 +1757,7 @@ project(':tools') { archivesBaseName = "kafka-tools" dependencies { +implementation project(':core') Review Comment: Ticket https://issues.apache.org/jira/browse/KAFKA-14525 (the parent of https://issues.apache.org/jira/browse/KAFKA-14586) says > tools that don't require access to `core` classes and communicate via the kafka protocol (typically by using the client classes) should be moved to the `tools` module. This addition contradicts that requirement. As far as I see, `kafka.utils.CommandLineUtils` is the only dependency to `core`. Is that true? Would it be possible to move `kafka.utils.CommandLineUtils` to a different module? Or should we even get completely rid of that dependency by rewriting it in java and put it in a different module. https://issues.apache.org/jira/browse/KAFKA-14576 should have a similar issue since also the console consumer should be moved to tools and it has a dependency to `kafka.utils.CommandLineUtils`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
cadonna commented on code in PR #12821: URL: https://github.com/apache/kafka/pull/12821#discussion_r1080946697 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ## @@ -187,151 +137,107 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); -ctrl.verify(); +// The unlock logic should still be executed. +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { -expect(stateManager.taskId()).andReturn(taskId); - -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - -stateDirectory.unlock(taskId); - -ctrl.checkOrder(true); -ctrl.replay(); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); +doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); -ctrl.verify(); +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall(); - +final InOrder inOrder = inOrder(stateManager, stateDirectory); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); // The `baseDir` will be accessed when attempting to delete the state store. - expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); - -stateDirectory.unlock(taskId); -expectLastCall(); - -ctrl.checkOrder(true); -ctrl.replay(); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); -ctrl.verify(); +inOrder.verify(stateManager).close(); +inOrder.verify(stateDirectory).unlock(taskId); +verifyNoMoreInteractions(stateManager, stateDirectory); } @Test -public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException { +public void shouldStillWipeStateStoresIfCloseThrowsException() { final File randomFile = new File("/random/path"); -mockStatic(Utils.class); - -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall().andThrow(new ProcessorStateException("Close failed")); - -expect(stateManager.baseDir()).andReturn(randomFile); -Utils.delete(randomFile); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); +doThrow(new ProcessorStateException("Close failed")).when(stateManager).close(); +when(stateManager.baseDir()).thenReturn(randomFile); -stateDirectory.unlock(taskId); -expectLastCall(); +try (MockedStatic utils = mockStatic(Utils.class)) { +assertThrows(ProcessorStateException.class, () -> +StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); +} -ctrl.checkOrder(true); -ctrl.replay(); - -replayAll(); - -assertThrows(ProcessorStateException.class, () -> -StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - -ctrl.verify(); +verify(stateDirectory).unlock(taskId); } @Test -public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException { +public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() { final File unknownFile = new File("/unknown/path"); -mockStatic(Utils.class); - -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall(); - -expect(stateManager.baseDir()).andReturn(unknownFile); - -Utils.delete(unknownFile); -expectLastCall().andThrow(new IOException("Deletion failed")); +
[GitHub] [kafka] cadonna commented on pull request #12739: Replace EasyMock and PowerMock with Mockito | TimeOrderedCachingPersistentWindowStoreTest
cadonna commented on PR #12739: URL: https://github.com/apache/kafka/pull/12739#issuecomment-1396582822 @shekhar-rajak Do you have any updates for this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13127: Kafka 14586: Moving StreamResetter to tools
vamossagar12 commented on PR #13127: URL: https://github.com/apache/kafka/pull/13127#issuecomment-1396439952 Adding @fvaleri . There are checkstyle issues, which I would fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13021: KAFKA-14468: Implement CommitRequestManager to manage the commit and autocommit requests
philipnee commented on code in PR #13021: URL: https://github.com/apache/kafka/pull/13021#discussion_r1080743797 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.RetriableCommitFailedException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.stream.Collectors; + +public class CommitRequestManager implements RequestManager { +private final Queue stagedCommits; +// TODO: We will need to refactor the subscriptionState +private final SubscriptionState subscriptionState; +private final Logger log; +private final Optional autoCommitState; Review Comment: It makes sense to use a MaxValue as well, my counter argument is, i think explicitly disabling autoCommitState makes the logic more straightforward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akhileshchg commented on a diff in pull request #13130: Fix upgrade compatibility issue from older versions to 3.4
akhileshchg commented on code in PR #13130: URL: https://github.com/apache/kafka/pull/13130#discussion_r1080739852 ## core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala: ## @@ -157,7 +157,8 @@ case class MetaProperties( object BrokerMetadataCheckpoint extends Logging { def getBrokerMetadataAndOfflineDirs( logDirs: collection.Seq[String], -ignoreMissing: Boolean +ignoreMissing: Boolean, +kraftMode: Boolean = false Review Comment: Makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13130: Fix upgrade compatibility issue from older versions to 3.4
ijuma commented on code in PR #13130: URL: https://github.com/apache/kafka/pull/13130#discussion_r1080736930 ## core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala: ## @@ -157,7 +157,8 @@ case class MetaProperties( object BrokerMetadataCheckpoint extends Logging { def getBrokerMetadataAndOfflineDirs( logDirs: collection.Seq[String], -ignoreMissing: Boolean +ignoreMissing: Boolean, +kraftMode: Boolean = false Review Comment: Default arguments like this make it easy to miss places that should be updated. Better to be explicit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akhileshchg opened a new pull request, #13130: Fix upgrade compatibility issue from older versions to 3.4
akhileshchg opened a new pull request, #13130: URL: https://github.com/apache/kafka/pull/13130 3.4 introduced a change that requires cluster.id to be present in meta.properties if the file is available. This information is not persisted by the brokers in old versions (< 0.10). So on upgrade, the requirement check fails and halts the broker start-up. Fixed the requirement to ensure cluster.id is not required in zk mode on upgrade. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Gerrrr opened a new pull request, #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets
Ge opened a new pull request, #13129: URL: https://github.com/apache/kafka/pull/13129 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13021: KAFKA-14468: Implement CommitRequestManager to manage the commit and autocommit requests
hachikuji commented on code in PR #13021: URL: https://github.com/apache/kafka/pull/13021#discussion_r1080689650 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.RetriableCommitFailedException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.stream.Collectors; + +public class CommitRequestManager implements RequestManager { +private final Queue stagedCommits; +// TODO: We will need to refactor the subscriptionState +private final SubscriptionState subscriptionState; +private final Logger log; +private final Optional autoCommitState; +private final Optional coordinatorRequestManager; +private final GroupStateManager groupState; + +public CommitRequestManager( +final Time time, +final LogContext logContext, +final SubscriptionState subscriptionState, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final GroupStateManager groupState) { +this.log = logContext.logger(getClass()); +this.stagedCommits = new LinkedList<>(); +if (config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { +final long autoCommitInterval = + Integer.toUnsignedLong(config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); +this.autoCommitState = Optional.of(new AutoCommitState(time, autoCommitInterval)); +} else { +this.autoCommitState = Optional.empty(); +} +this.coordinatorRequestManager = Optional.ofNullable(coordinatorRequestManager); +this.groupState = groupState; +this.subscriptionState = subscriptionState; +} + +// Visible for testing +CommitRequestManager( +final Time time, +final LogContext logContext, +final SubscriptionState subscriptionState, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final GroupStateManager groupState, +final AutoCommitState autoCommitState) { +this.log = logContext.logger(getClass()); +this.subscriptionState = subscriptionState; +this.coordinatorRequestManager = Optional.ofNullable(coordinatorRequestManager); +this.groupState = groupState; +this.autoCommitState = Optional.ofNullable(autoCommitState); +this.stagedCommits = new LinkedList<>(); +} + +/** + * Poll for the commit request if there's any. The function will also try to autocommit, if enabled. + * + * @param currentTimeMs + * @return + */ +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +if (!coordinatorRequestManager.isPresent()) { +return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()); +} + +maybeAutoCommit(currentTimeMs); + +if (stagedCommits.isEmpty()) { +return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()); +} + +List unsentCommitRequests = + stagedCommits.stream().map(StagedCommit::toUnsent
[GitHub] [kafka] philipnee commented on pull request #13125: KAFKA-14626: Kafka Consumer Coordinator does not cleanup all metrics after shutdown
philipnee commented on PR #13125: URL: https://github.com/apache/kafka/pull/13125#issuecomment-1396238469 Thanks for the PR and the issue @yufeiyan1220 - I wonder if the clean up is necessary, as the metrics will be closed upon the client closing. Willing to hear what others say. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1080666149 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java: ## @@ -215,4 +223,86 @@ CompletableFuture deleteOffsets( OffsetDeleteRequestData request, BufferSupplier bufferSupplier ); + +/** + * Return the partition index for the given Group. + * + * @param groupId The group id. + * + * @return The partition index. + */ +int partitionFor(String groupId); + +/** + * Commit or abort the pending transactional offsets for the given partitions. + * + * @param producerIdThe producer id. + * @param partitionsThe partitions. + * @param transactionResult The result of the transaction. + */ +void onTransactionCompleted( +long producerId, +Iterable partitions, +TransactionResult transactionResult +); + +/** + * Delete the provided partitions' offsets. + * + * @param topicPartitions The deleted partitions. + * @param bufferSupplierThe buffer supplier tight to the request thread. + */ +void onPartitionsDeleted( +List topicPartitions, +BufferSupplier bufferSupplier +); + +/** + * Group coordinator is now the leader for the given partition at the + * given leader epoch. It should load cached state from the partition + * and begin handling requests for groups mapped to it. + * + * @param partitionIndexThe partition index. + * @param partitionLeaderEpoch The leader epoch of the partition. Review Comment: It seems like in some places we avoid mentioning consumer offsets, but in other places we do mention the topic partitions. (see below for the configs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1080665363 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java: ## @@ -215,4 +223,86 @@ CompletableFuture deleteOffsets( OffsetDeleteRequestData request, BufferSupplier bufferSupplier ); + +/** + * Return the partition index for the given Group. + * + * @param groupId The group id. + * + * @return The partition index. + */ +int partitionFor(String groupId); + +/** + * Commit or abort the pending transactional offsets for the given partitions. + * + * @param producerIdThe producer id. + * @param partitionsThe partitions. + * @param transactionResult The result of the transaction. + */ +void onTransactionCompleted( +long producerId, +Iterable partitions, +TransactionResult transactionResult +); + +/** + * Delete the provided partitions' offsets. + * + * @param topicPartitions The deleted partitions. + * @param bufferSupplierThe buffer supplier tight to the request thread. + */ +void onPartitionsDeleted( +List topicPartitions, +BufferSupplier bufferSupplier +); + +/** + * Group coordinator is now the leader for the given partition at the + * given leader epoch. It should load cached state from the partition + * and begin handling requests for groups mapped to it. + * + * @param partitionIndexThe partition index. + * @param partitionLeaderEpoch The leader epoch of the partition. Review Comment: Few questions -- this is the coordinator's state topic partitions -- ie __consumer_offsets or a different partition? I also noticed in the current group coordinator implementation, we call the second parameter "coordinatorEpoch" is the equivalent to the leader epoch for the the partition its managing? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java: ## @@ -215,4 +223,86 @@ CompletableFuture deleteOffsets( OffsetDeleteRequestData request, BufferSupplier bufferSupplier ); + +/** + * Return the partition index for the given Group. + * + * @param groupId The group id. + * + * @return The partition index. + */ +int partitionFor(String groupId); + +/** + * Commit or abort the pending transactional offsets for the given partitions. + * + * @param producerIdThe producer id. + * @param partitionsThe partitions. + * @param transactionResult The result of the transaction. + */ +void onTransactionCompleted( +long producerId, +Iterable partitions, +TransactionResult transactionResult +); + +/** + * Delete the provided partitions' offsets. + * + * @param topicPartitions The deleted partitions. + * @param bufferSupplierThe buffer supplier tight to the request thread. + */ +void onPartitionsDeleted( +List topicPartitions, +BufferSupplier bufferSupplier +); + +/** + * Group coordinator is now the leader for the given partition at the + * given leader epoch. It should load cached state from the partition + * and begin handling requests for groups mapped to it. + * + * @param partitionIndexThe partition index. + * @param partitionLeaderEpoch The leader epoch of the partition. Review Comment: Few questions -- this is the coordinator's state topic partitions -- ie __consumer_offsets or a different partition? I also noticed in the current group coordinator implementation, we call the second parameter "coordinatorEpoch" is the equivalent to the leader epoch for the the partition it's managing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1080662718 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java: ## @@ -215,4 +223,86 @@ CompletableFuture deleteOffsets( OffsetDeleteRequestData request, BufferSupplier bufferSupplier ); + +/** + * Return the partition index for the given Group. + * + * @param groupId The group id. + * + * @return The partition index. + */ +int partitionFor(String groupId); + +/** + * Commit or abort the pending transactional offsets for the given partitions. + * + * @param producerIdThe producer id. + * @param partitionsThe partitions. + * @param transactionResult The result of the transaction. + */ +void onTransactionCompleted( +long producerId, +Iterable partitions, +TransactionResult transactionResult +); + +/** + * Delete the provided partitions' offsets. Review Comment: Is it clearer to say "Remove the provided deleted partitions offsets"? I guess the parameter explains that it was deleted partitions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1080661020 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -3448,7 +3445,7 @@ class KafkaApisTest { val expectedJoinGroupResponse = new JoinGroupResponseData() .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code) .setMemberId("member") - .setProtocolName(if (version >= 7) null else GroupCoordinator.NoProtocol) + .setProtocolName(if (version >= 7) null else kafka.coordinator.group.GroupCoordinator.NoProtocol) Review Comment: Did we want to move these constants too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] erichaagdev opened a new pull request, #13128: MINOR: Define a root project name in the Gradle settings file
erichaagdev opened a new pull request, #13128: URL: https://github.com/apache/kafka/pull/13128 It is a good practice to always define a root project name. However, this change is specifically being made to address a build caching miss as a result of not having the root project name defined. The `aggregatedJavadoc` task takes in the root project name as an input. When executing the build from two uniquely named directories, we will see the second execution of `aggregatedJavadoc` will be re-executed due to computed build cache key being different. Here are the steps to reproduce the issue: 1. Remove the contents of the Gradle local build cache with `rm -rf ~/.gradle/caches/build-cache-1`. WARNING: This is a destructive action, consider renaming the directory instead if you need to preserve the local build cache. 2. Clone and execute the project with `git clone --depth 1 g...@github.com:apache/kafka.git first_build && (cd first_build && ./gradlew aggregatedJavadoc --build-cache)`. 3. Clone and execute the project again with `git clone --depth 1 g...@github.com:apache/kafka.git second_build && (cd second_build && ./gradlew aggregatedJavadoc --build-cache --scan)`. 4. Agree to the Gradle Terms of Service when prompted to publish a Build Scan. 5. We can see on the Performance > Task execution screen that [a cacheable task](https://scans.gradle.com/s/7bnq7gb4ykmdk/timeline?cacheability=cacheable&hide-timeline&outcome=success,failed&sort=longest) has been executed. We can click the Cacheable link to find [the culprit](https://scans.gradle.com/s/7bnq7gb4ykmdk/timeline?cacheability=cacheable&hide-timeline&outcome=success,failed&sort=longest#lhlgorbepbsx4). ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1080653448 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ## @@ -511,4 +532,53 @@ class GroupCoordinatorAdapter( future } + + override def partitionFor(groupId: String): Int = { +coordinator.partitionFor(groupId) + } + + override def onTransactionCompleted( +producerId: Long, +partitions: java.lang.Iterable[TopicPartition], +transactionResult: TransactionResult + ): Unit = { +coordinator.scheduleHandleTxnCompletion( + producerId, + partitions.asScala, + transactionResult +) + } + + override def onPartitionsDeleted( +topicPartitions: util.List[TopicPartition], +bufferSupplier: BufferSupplier + ): Unit = { +coordinator.handleDeletedPartitions(topicPartitions.asScala, RequestLocal(bufferSupplier)) Review Comment: also any reason for the change from request local as a parameter to creating it here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #12922: KAFKA-14397; Don't reset producer sequence number after delivery timeout
hachikuji commented on PR #12922: URL: https://github.com/apache/kafka/pull/12922#issuecomment-1396203599 Note I'm holding off on merging this patch because I realized when writing the test case that Justine suggested that the code was not correctly handling the case when the next in-line batch fails due to retries or timeouts correctly. Basically we do not reset the next expected sequence number and some other state. I'll try to work on this in the next couple weeks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rishiraj88 commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
rishiraj88 commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1080573968 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int, */ private def maybeUpdateCoordinatorEpoch( partitionId: Int, -epochOpt: Option[Int] +epochOpt: OptionalInt ): Boolean = { val updatedEpoch = epochForPartitionId.compute(partitionId, (_, currentEpoch) => { if (currentEpoch == null) { -epochOpt.map(Int.box).orNull +if (epochOpt.isPresent) epochOpt.getAsInt +else null } else { -epochOpt match { - case Some(epoch) if epoch > currentEpoch => epoch - case _ => currentEpoch -} +if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) epochOpt.getAsInt +else currentEpoch } }) -epochOpt.forall(_ == updatedEpoch) +if(epochOpt.isPresent) { Review Comment: The point made by @jolshan looks helpful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1080547019 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int, */ private def maybeUpdateCoordinatorEpoch( partitionId: Int, -epochOpt: Option[Int] +epochOpt: OptionalInt ): Boolean = { val updatedEpoch = epochForPartitionId.compute(partitionId, (_, currentEpoch) => { if (currentEpoch == null) { -epochOpt.map(Int.box).orNull +if (epochOpt.isPresent) epochOpt.getAsInt +else null } else { -epochOpt match { - case Some(epoch) if epoch > currentEpoch => epoch - case _ => currentEpoch -} +if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) epochOpt.getAsInt +else currentEpoch } }) -epochOpt.forall(_ == updatedEpoch) +if(epochOpt.isPresent) { Review Comment: Yeah. I was only suggesting because the other provides a few more methods (like getOrElse letting you return null instead of an int) but yeah, probably not worth changing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
ijuma commented on PR #13040: URL: https://github.com/apache/kafka/pull/13040#issuecomment-1387745094 @satishd No worries, it happens to all of us. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
dajac commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1074022302 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -310,9 +307,9 @@ class KafkaApis(val requestChannel: RequestChannel, if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME && partitionState.deletePartition) { val leaderEpoch = if (partitionState.leaderEpoch >= 0) - Some(partitionState.leaderEpoch) Review Comment: The entire group coordinator in scala will disappear at some point, including the adapter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
dajac commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1073982833 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int, */ private def maybeUpdateCoordinatorEpoch( partitionId: Int, -epochOpt: Option[Int] +epochOpt: OptionalInt ): Boolean = { val updatedEpoch = epochForPartitionId.compute(partitionId, (_, currentEpoch) => { if (currentEpoch == null) { -epochOpt.map(Int.box).orNull +if (epochOpt.isPresent) epochOpt.getAsInt +else null } else { -epochOpt match { - case Some(epoch) if epoch > currentEpoch => epoch - case _ => currentEpoch -} +if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) epochOpt.getAsInt +else currentEpoch } }) -epochOpt.forall(_ == updatedEpoch) +if(epochOpt.isPresent) { Review Comment: OptionalInt seems better here because we effectively pass an int, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1073964326 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int, */ private def maybeUpdateCoordinatorEpoch( partitionId: Int, -epochOpt: Option[Int] +epochOpt: OptionalInt ): Boolean = { val updatedEpoch = epochForPartitionId.compute(partitionId, (_, currentEpoch) => { if (currentEpoch == null) { -epochOpt.map(Int.box).orNull +if (epochOpt.isPresent) epochOpt.getAsInt +else null } else { -epochOpt match { - case Some(epoch) if epoch > currentEpoch => epoch - case _ => currentEpoch -} +if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) epochOpt.getAsInt +else currentEpoch } }) -epochOpt.forall(_ == updatedEpoch) +if(epochOpt.isPresent) { Review Comment: There's minor method differences between OptionalInt and Optional. That was my point. But maybe not big enough to change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1073964326 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int, */ private def maybeUpdateCoordinatorEpoch( partitionId: Int, -epochOpt: Option[Int] +epochOpt: OptionalInt ): Boolean = { val updatedEpoch = epochForPartitionId.compute(partitionId, (_, currentEpoch) => { if (currentEpoch == null) { -epochOpt.map(Int.box).orNull +if (epochOpt.isPresent) epochOpt.getAsInt +else null } else { -epochOpt match { - case Some(epoch) if epoch > currentEpoch => epoch - case _ => currentEpoch -} +if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) epochOpt.getAsInt +else currentEpoch } }) -epochOpt.forall(_ == updatedEpoch) +if(epochOpt.isPresent) { Review Comment: There's minor method differences between OptionalInt and Optional\. That was my point. But maybe not big enough to change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
dajac commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1073960778 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int, */ private def maybeUpdateCoordinatorEpoch( partitionId: Int, -epochOpt: Option[Int] +epochOpt: OptionalInt ): Boolean = { val updatedEpoch = epochForPartitionId.compute(partitionId, (_, currentEpoch) => { if (currentEpoch == null) { -epochOpt.map(Int.box).orNull +if (epochOpt.isPresent) epochOpt.getAsInt +else null } else { -epochOpt match { - case Some(epoch) if epoch > currentEpoch => epoch - case _ => currentEpoch -} +if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) epochOpt.getAsInt +else currentEpoch } }) -epochOpt.forall(_ == updatedEpoch) +if(epochOpt.isPresent) { Review Comment: > I also wonder if there is a cleaner way to write the above methods with the OptionalInt. But perhaps not. This is the best I came up with. > I see the options here are limited. Did you consider using java optionals here too? I suppose those are a bit more heavyweight and maybe still don't provide much better methods. I don't understand your point here. We already use java optionals here now. ## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ## @@ -511,4 +532,53 @@ class GroupCoordinatorAdapter( future } + + override def partitionFor(groupId: String): Int = { +coordinator.partitionFor(groupId) + } + + override def onTransactionCompleted( Review Comment: I found the old name a bit weird. I have also tried to name all these "callbacks" similarly. They all start with `on` now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
dajac commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1073960778 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int, */ private def maybeUpdateCoordinatorEpoch( partitionId: Int, -epochOpt: Option[Int] +epochOpt: OptionalInt ): Boolean = { val updatedEpoch = epochForPartitionId.compute(partitionId, (_, currentEpoch) => { if (currentEpoch == null) { -epochOpt.map(Int.box).orNull +if (epochOpt.isPresent) epochOpt.getAsInt +else null } else { -epochOpt match { - case Some(epoch) if epoch > currentEpoch => epoch - case _ => currentEpoch -} +if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) epochOpt.getAsInt +else currentEpoch } }) -epochOpt.forall(_ == updatedEpoch) +if(epochOpt.isPresent) { Review Comment: > I also wonder if there is a cleaner way to write the above methods with the OptionalInt. But perhaps not. This is the best I came up with. I have also tried to name all these "callbacks" similarly. They all start with `on` now. > I see the options here are limited. Did you consider using java optionals here too? I suppose those are a bit more heavyweight and maybe still don't provide much better methods. I don't understand your point here. We already use java optionals here now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
dajac commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1073960778 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int, */ private def maybeUpdateCoordinatorEpoch( partitionId: Int, -epochOpt: Option[Int] +epochOpt: OptionalInt ): Boolean = { val updatedEpoch = epochForPartitionId.compute(partitionId, (_, currentEpoch) => { if (currentEpoch == null) { -epochOpt.map(Int.box).orNull +if (epochOpt.isPresent) epochOpt.getAsInt +else null } else { -epochOpt match { - case Some(epoch) if epoch > currentEpoch => epoch - case _ => currentEpoch -} +if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) epochOpt.getAsInt +else currentEpoch } }) -epochOpt.forall(_ == updatedEpoch) +if(epochOpt.isPresent) { Review Comment: > I also wonder if there is a cleaner way to write the above methods with the OptionalInt. But perhaps not. This is the best I came up with. > I see the options here are limited. Did you consider using java optionals here too? I suppose those are a bit more heavyweight and maybe still don't provide much better methods. I don't understand your point here. We already use java optionals here now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
dajac commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1073958844 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ## @@ -511,4 +532,53 @@ class GroupCoordinatorAdapter( future } + + override def partitionFor(groupId: String): Int = { +coordinator.partitionFor(groupId) + } + + override def onTransactionCompleted( Review Comment: I found the old name a bit weird. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1073948248 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int, */ private def maybeUpdateCoordinatorEpoch( partitionId: Int, -epochOpt: Option[Int] +epochOpt: OptionalInt ): Boolean = { val updatedEpoch = epochForPartitionId.compute(partitionId, (_, currentEpoch) => { if (currentEpoch == null) { -epochOpt.map(Int.box).orNull +if (epochOpt.isPresent) epochOpt.getAsInt +else null } else { -epochOpt match { - case Some(epoch) if epoch > currentEpoch => epoch - case _ => currentEpoch -} +if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) epochOpt.getAsInt +else currentEpoch } }) -epochOpt.forall(_ == updatedEpoch) +if(epochOpt.isPresent) { Review Comment: I see the options here are limited. Did you consider using java optionals here too? I suppose those are a bit more heavyweight and maybe still don't provide much better methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1073945942 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int, */ private def maybeUpdateCoordinatorEpoch( partitionId: Int, -epochOpt: Option[Int] +epochOpt: OptionalInt ): Boolean = { val updatedEpoch = epochForPartitionId.compute(partitionId, (_, currentEpoch) => { if (currentEpoch == null) { -epochOpt.map(Int.box).orNull +if (epochOpt.isPresent) epochOpt.getAsInt +else null } else { -epochOpt match { - case Some(epoch) if epoch > currentEpoch => epoch - case _ => currentEpoch -} +if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) epochOpt.getAsInt +else currentEpoch } }) -epochOpt.forall(_ == updatedEpoch) +if(epochOpt.isPresent) { Review Comment: nit: spacing. I also wonder if there is a cleaner way to write the above methods with the OptionalInt. But perhaps not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org