[GitHub] [kafka] vamossagar12 commented on pull request #13127: KAFKA-14586: Moving StreamResetter to tools
vamossagar12 commented on PR #13127: URL: https://github.com/apache/kafka/pull/13127#issuecomment-1407272563 > @vamossagar12 Thanks for the PR! > > I had one major comment. Thanks @cadonna . I think that comment has been addressed now based on the conversation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13127: KAFKA-14586: Moving StreamResetter to tools
vamossagar12 commented on code in PR #13127: URL: https://github.com/apache/kafka/pull/13127#discussion_r1089609015 ## build.gradle: ## @@ -1757,6 +1757,7 @@ project(':tools') { archivesBaseName = "kafka-tools" dependencies { +implementation project(':core') Review Comment: hi @cadonna , The PR for moving `CommandLineUtils` has been [merged to trunk](https://github.com/apache/kafka/pull/13131), meaning I don't need to include this dependency anymore. I have updated the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
vamossagar12 commented on PR #13095: URL: https://github.com/apache/kafka/pull/13095#issuecomment-1407268172 > EndToEndLatencyService Thanks @mimaison . Actually I lack some context here. The class, `TestEndToEndLatency` was renamed to `EndToEndLatency` with this very old PR: https://github.com/apache/kafka/commit/e43c9aff92c57da6abb0c1d0af3431a550110a89#diff-52dbfa7ab683a53b91a84c35f309b56ff1b2a1cd94e4ccb86c5843e9e44a050f and in a subsequent PR, the support for zk_connect was removed. So, the line you highlighted above mayn't need to be changed. Also, there's no reference of `TestEndToEndLatency` in the project anymore in trunk so I am assuming it would be shipped with older version (< 0.9) maybe? Were you referring to this line instead? https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/performance/end_to_end_latency.py#L127 which is invoked on line #91? I agree, that needs to be changed. Plz let me know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14659) source-record-write-[rate|total] metrics include filtered records
[ https://issues.apache.org/jira/browse/KAFKA-14659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Beard updated KAFKA-14659: Priority: Minor (was: Major) > source-record-write-[rate|total] metrics include filtered records > - > > Key: KAFKA-14659 > URL: https://issues.apache.org/jira/browse/KAFKA-14659 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Beard >Priority: Minor > > Source tasks in Kafka connect offer two sets of metrics (documented in > [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]): > ||Metric||Description|| > |source-record-poll-rate|The average per-second number of records > produced/polled (before transformation) by this task belonging to the named > source connector in this worker.| > |source-record-write-rate|The average per-second number of records output > from the transformations and written to Kafka for this task belonging to the > named source connector in this worker. This is after transformations are > applied and excludes any records filtered out by the transformations.| > There are also corresponding "-total" metrics that capture the total number > of records polled and written for the metrics above, respectively. > In short, the "poll" metrics capture the number of messages sourced > pre-transformation/filtering, and the "write" metrics should capture the > number of messages ultimately written to Kafka post-transformation/filtering. > However, the implementation of the {{source-record-write-*}} metrics > _includes_ records filtered out by transformations (and also records that > result in produce failures with the config {{{}errors.tolerance=all{}}}). > h3. Details > In > [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397], > each source record is passed through the transformation chain where it is > potentially filtered out, checked to see if it was in fact filtered out, and > if so it is accounted for in the internal metrics via > {{{}counter.skipRecord(){}}}. > {code:java} > for (final SourceRecord preTransformRecord : toSend) { > retryWithToleranceOperator.sourceRecord(preTransformRecord); > final SourceRecord record = > transformationChain.apply(preTransformRecord); > final ProducerRecord producerRecord = > convertTransformedRecord(record); > if (producerRecord == null || retryWithToleranceOperator.failed()) { > > counter.skipRecord(); > recordDropped(preTransformRecord); > continue; > } > ... > {code} > {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows: > {code:java} > > public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup > metricsGroup) { > assert batchSize > 0; > assert metricsGroup != null; > this.batchSize = batchSize; > counter = batchSize; > this.metricsGroup = metricsGroup; > } > public void skipRecord() { > if (counter > 0 && --counter == 0) { > finishedAllWrites(); > } > } > > private void finishedAllWrites() { > if (!completed) { > metricsGroup.recordWrite(batchSize - counter); > completed = true; > } > } > {code} > For example: If a batch starts with 100 records, {{batchSize}} and > {{counter}} will both be initialized to 100. If all 100 records get filtered > out, {{counter}} will be decremented 100 times, and > {{{}finishedAllWrites(){}}}will record the value 100 to the underlying > {{source-record-write-*}} metrics rather than 0, the correct value according > to the documentation for these metrics. > h3. Solutions > Assuming the documentation correctly captures the intent of the > {{source-record-write-*}} metrics, it seems reasonable to fix these metrics > such that filtered records do not get counted. > It may also be useful to add additional metrics to capture the rate and total > number of records filtered out by transformations, which would require a KIP. > I'm not sure what the best way of accounting for produce failures in the case > of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own > new metrics? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12225) Unexpected broker bottleneck when scaling producers
[ https://issues.apache.org/jira/browse/KAFKA-12225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Beard updated KAFKA-12225: Priority: Major (was: Minor) > Unexpected broker bottleneck when scaling producers > --- > > Key: KAFKA-12225 > URL: https://issues.apache.org/jira/browse/KAFKA-12225 > Project: Kafka > Issue Type: Improvement > Components: core > Environment: AWS Based > 5-node cluster running on k8s with EBS attached disks (HDD) > Kafka Version 2.5.0 > Multiple Producers (KafkaStreams, Akka Streams, golang Sarama) >Reporter: Harel Ben Attia >Priority: Major > > > *TLDR*: There seems to be a major lock contention that can happen on > *{{Log.lock}}* during producer-scaling when produce-request sending is > time-based ({{linger.ms}}) and not data-size based (max batch size). > Hi, > We're running a 5-node Kafka cluster on one of our production systems on AWS. > Recently, we have started to notice that as our producer services scale out, > the Kafka idle-percentage drops abruptly from ~70% idle percentage to 0% on > all brokers, even though none of the physical resources of the brokers are > exhausted. > Initially, we realised that our {{io.thread}} count was too low, causing high > request queuing and the low idle percentage, so we have increased it, hoping > to see one of the physical resources maxing out. After changing it we still > continued to see abrupt drops of the idle-percentage to 0% (with no physical > resource maxing out), so we continued to investigate. > The investigation has shown that there's a direct relation to {{linger.ms}} > being the controlling factor of sending out produce requests. Whenever > messages are being sent out from the producer due to the {{linger.ms}} > threshold, scaling out the service increased the number of produce requests > in a way which is not proportional to our traffic increase, bringing down all > the brokers to a near-halt in terms of being able to process requests and, as > mentioned, without any exhaustion of physical resources. > After some more experiments and profiling a broker through flight recorder, > we have found out that the cause of the issue is a lock contention on a > *{{java.lang.Object}}*, wasting a lot of time on all the > {{data-plane-kafka-request-handler}} threads. 90% of the locks were on Log's > *{{lock: Object}}* instance, inside the *{{Log.append()}}* method. The stack > traces show that these locks occur during the {{handleProductRequest}} > method. We have ruled out replication as the source of the issues, as there > were no replication issues, and the control-plane has a separate thread pool, > so this focused us back on the actual producers, leading back to the > behaviour of our producer service when scaling out. > At that point we thought that maybe the issue is related to the number of > partitions of the topic (60 currently), and increasing it would reduce the > lock contention on each {{Log}} instance, but since each producer writes to > all partitions (data is evenly spread and not skewed), then increasing the > number of partitions would only cause each producer to generate more > produce-requests, not alleviating the lock contention. Also, increasing the > number of brokers would increase the idle percentage per broker, but > essentially would not help reducing the produce-request latency, since this > would not change the rate of produce-requests per Log. > Eventually, we've worked around the issue by making the {{linger.ms}} value > high enough so it stopped being the controlling factor of sending messages > (e.g. produce-requests became coupled to the size of the traffic due to the > max batch size becoming the controlling factor). This allowed us to utilise > the cluster better without upscaling it. > From our analysis, it seems that this lock behaviour limits Kafka's ability > to be robust to producer configuration and scaling, and hurts the ability to > do efficient capacity planning for the cluster, increasing the risk of an > unexpected bottleneck when traffic increases. > It would be great if you can validate these conclusions, or provide any more > information that will help us understand the issue better or work around it > in a more efficient way. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12225) Unexpected broker bottleneck when scaling producers
[ https://issues.apache.org/jira/browse/KAFKA-12225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Beard updated KAFKA-12225: Priority: Minor (was: Major) > Unexpected broker bottleneck when scaling producers > --- > > Key: KAFKA-12225 > URL: https://issues.apache.org/jira/browse/KAFKA-12225 > Project: Kafka > Issue Type: Improvement > Components: core > Environment: AWS Based > 5-node cluster running on k8s with EBS attached disks (HDD) > Kafka Version 2.5.0 > Multiple Producers (KafkaStreams, Akka Streams, golang Sarama) >Reporter: Harel Ben Attia >Priority: Minor > > > *TLDR*: There seems to be a major lock contention that can happen on > *{{Log.lock}}* during producer-scaling when produce-request sending is > time-based ({{linger.ms}}) and not data-size based (max batch size). > Hi, > We're running a 5-node Kafka cluster on one of our production systems on AWS. > Recently, we have started to notice that as our producer services scale out, > the Kafka idle-percentage drops abruptly from ~70% idle percentage to 0% on > all brokers, even though none of the physical resources of the brokers are > exhausted. > Initially, we realised that our {{io.thread}} count was too low, causing high > request queuing and the low idle percentage, so we have increased it, hoping > to see one of the physical resources maxing out. After changing it we still > continued to see abrupt drops of the idle-percentage to 0% (with no physical > resource maxing out), so we continued to investigate. > The investigation has shown that there's a direct relation to {{linger.ms}} > being the controlling factor of sending out produce requests. Whenever > messages are being sent out from the producer due to the {{linger.ms}} > threshold, scaling out the service increased the number of produce requests > in a way which is not proportional to our traffic increase, bringing down all > the brokers to a near-halt in terms of being able to process requests and, as > mentioned, without any exhaustion of physical resources. > After some more experiments and profiling a broker through flight recorder, > we have found out that the cause of the issue is a lock contention on a > *{{java.lang.Object}}*, wasting a lot of time on all the > {{data-plane-kafka-request-handler}} threads. 90% of the locks were on Log's > *{{lock: Object}}* instance, inside the *{{Log.append()}}* method. The stack > traces show that these locks occur during the {{handleProductRequest}} > method. We have ruled out replication as the source of the issues, as there > were no replication issues, and the control-plane has a separate thread pool, > so this focused us back on the actual producers, leading back to the > behaviour of our producer service when scaling out. > At that point we thought that maybe the issue is related to the number of > partitions of the topic (60 currently), and increasing it would reduce the > lock contention on each {{Log}} instance, but since each producer writes to > all partitions (data is evenly spread and not skewed), then increasing the > number of partitions would only cause each producer to generate more > produce-requests, not alleviating the lock contention. Also, increasing the > number of brokers would increase the idle percentage per broker, but > essentially would not help reducing the produce-request latency, since this > would not change the rate of produce-requests per Log. > Eventually, we've worked around the issue by making the {{linger.ms}} value > high enough so it stopped being the controlling factor of sending messages > (e.g. produce-requests became coupled to the size of the traffic due to the > max batch size becoming the controlling factor). This allowed us to utilise > the cluster better without upscaling it. > From our analysis, it seems that this lock behaviour limits Kafka's ability > to be robust to producer configuration and scaling, and hurts the ability to > do efficient capacity planning for the cluster, increasing the risk of an > unexpected bottleneck when traffic increases. > It would be great if you can validate these conclusions, or provide any more > information that will help us understand the issue better or work around it > in a more efficient way. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14659) source-record-write-[rate|total] metrics include filtered records
Chris Beard created KAFKA-14659: --- Summary: source-record-write-[rate|total] metrics include filtered records Key: KAFKA-14659 URL: https://issues.apache.org/jira/browse/KAFKA-14659 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Chris Beard Source tasks in Kafka connect offer two sets of metrics (documented in [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]): ||Metric||Description|| |source-record-poll-rate|The average per-second number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker.| |source-record-write-rate|The average per-second number of records output from the transformations and written to Kafka for this task belonging to the named source connector in this worker. This is after transformations are applied and excludes any records filtered out by the transformations.| There are also corresponding "-total" metrics that capture the total number of records polled and written for the metrics above, respectively. In short, the "poll" metrics capture the number of messages sourced pre-transformation/filtering, and the "write" metrics should capture the number of messages ultimately written to Kafka post-transformation/filtering. However, the implementation of the {{source-record-write-*}} metrics _includes_ records filtered out by transformations (and also records that result in produce failures with the config {{{}errors.tolerance=all{}}}). h3. Details In [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397], each source record is passed through the transformation chain where it is potentially filtered out, checked to see if it was in fact filtered out, and if so it is accounted for in the internal metrics via {{{}counter.skipRecord(){}}}. {code:java} for (final SourceRecord preTransformRecord : toSend) { retryWithToleranceOperator.sourceRecord(preTransformRecord); final SourceRecord record = transformationChain.apply(preTransformRecord); final ProducerRecord producerRecord = convertTransformedRecord(record); if (producerRecord == null || retryWithToleranceOperator.failed()) { counter.skipRecord(); recordDropped(preTransformRecord); continue; } ... {code} {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows: {code:java} public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup metricsGroup) { assert batchSize > 0; assert metricsGroup != null; this.batchSize = batchSize; counter = batchSize; this.metricsGroup = metricsGroup; } public void skipRecord() { if (counter > 0 && --counter == 0) { finishedAllWrites(); } } private void finishedAllWrites() { if (!completed) { metricsGroup.recordWrite(batchSize - counter); completed = true; } } {code} For example: If a batch starts with 100 records, {{batchSize}} and {{counter}} will both be initialized to 100. If all 100 records get filtered out, {{counter}} will be decremented 100 times, and {{{}finishedAllWrites(){}}}will record the value 100 to the underlying {{source-record-write-*}} metrics rather than 0, the correct value according to the documentation for these metrics. h3. Solutions Assuming the documentation correctly captures the intent of the {{source-record-write-*}} metrics, it seems reasonable to fix these metrics such that filtered records do not get counted. It may also be useful to add additional metrics to capture the rate and total number of records filtered out by transformations, which would require a KIP. I'm not sure what the best way of accounting for produce failures in the case of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own new metrics? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.
cmccabe commented on PR #13114: URL: https://github.com/apache/kafka/pull/13114#issuecomment-1407228326 * We need a `ScramControlManagerTest` as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.
cmccabe commented on PR #13114: URL: https://github.com/apache/kafka/pull/13114#issuecomment-1407228067 We should not be changing `BrokerMetadataSnapshotterTest.scala` here -- doesn't seem needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.
cmccabe commented on PR #13114: URL: https://github.com/apache/kafka/pull/13114#issuecomment-1407227921 Can we have a test in `ControllerApisTest`? These are mainly to verify that our authorization logic is sound. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store
vcrfxia commented on code in PR #13126: URL: https://github.com/apache/kafka/pull/13126#discussion_r1089556608 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}. + * The value format is: + * + * + + , reverse-sorted by timestamp> + + * + * Negative {@code value_size} is used to indicate that the value stored is a tombstone, in order to + * distinguish from empty array which has {@code value_size} of zero. In practice, {@code value_size} + * is always set to -1 for the tombstone case, though this need not be true in general. + */ +final class RocksDBVersionedStoreSegmentValueFormatter { +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; + +/** + * @return the validTo timestamp of the latest record in the provided segment + */ +static long getNextTimestamp(final byte[] segmentValue) { +return ByteBuffer.wrap(segmentValue).getLong(0); +} + +/** + * Returns whether the provided segment is "empty." An empty segment is one that + * contains only a single tombstone with no validTo timestamp specified. In this case, + * the serialized segment contains only the timestamp of the tombstone (stored as the segment's + * {@code nextTimestamp}) and nothing else. + * + * This can happen if, e.g., the only record inserted for a particular key is + * a tombstone. In this case, the tombstone must be stored in a segment + * (as the latest value store does not store tombstones), but also has no validTo + * timestamp associated with it. Review Comment: Hey @mjsax I just pushed an update to the serialization of "empty" segments (per your suggestion). The new serialization is `minTimestamp = nextTimestamp` with a single record version stored in the record versions list (tombstone value and timestamp equal to both `minTimestamp` and `nextTimestamp`). So, even though it's still a special case which cannot be combined with the regular case (as evidenced by the fact that the `insertAsLatest()` and `insertAsEarliest()` methods still have if-conditions to handle this case separately), now this special-casing is fully encapsulated within this value formatter utility class, i.e., the versioned store implementation itself (for which I'll open a PR as soon as this one is merged) does not need to concern itself with this special case (and therefore the public-facing `isEmpty()` methods have been removed accordingly). The updated javadocs in the latest commit contain more details. I've also renamed "empty" segment to "degenerate" segment, to clarify that the segment is not truly empty, it's just degenerate because the single tombstone in it has `validFrom = validTo`, which is not typically allowed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13153: MINOR: startup timeouts for KRaft integration tests
cmccabe commented on code in PR #13153: URL: https://github.com/apache/kafka/pull/13153#discussion_r1089524563 ## clients/src/main/java/org/apache/kafka/common/utils/Time.java: ## @@ -86,4 +89,30 @@ default Timer timer(Duration timeout) { return timer(timeout.toMillis()); } +/** + * Wait for a future to complete, or time out. + * + * @param futureThe future to wait for. + * @param deadlineNsThe time in the future, in monotonic nanoseconds, to time out. + * @return The result of the future. + * @paramThe type of the future. + */ +default T waitForFuture( +CompletableFuture future, +long deadlineNs Review Comment: Can't use that here since `:clients` doesn't depend on `:server-common`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13153: MINOR: startup timeouts for KRaft integration tests
cmccabe commented on code in PR #13153: URL: https://github.com/apache/kafka/pull/13153#discussion_r1089524442 ## server-common/src/main/java/org/apache/kafka/server/util/Deadline.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import org.apache.kafka.common.utils.Time; + +import java.math.BigInteger; +import java.util.Date; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + + +public class Deadline { +private final long nanoseconds; + +public static Deadline fromMonotonicNanoseconds( +long nanoseconds +) { +return new Deadline(nanoseconds); +} + +public static Deadline fromDelay( +Time time, +long delay, +TimeUnit timeUnit +) { +return fromDelay(time.nanoseconds(), delay, timeUnit); +} + +public static Deadline fromDelay( Review Comment: I'll remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13153: MINOR: startup timeouts for KRaft integration tests
hachikuji commented on code in PR #13153: URL: https://github.com/apache/kafka/pull/13153#discussion_r1089358150 ## server-common/src/main/java/org/apache/kafka/server/util/Deadline.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import org.apache.kafka.common.utils.Time; + +import java.math.BigInteger; +import java.util.Date; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + + +public class Deadline { +private final long nanoseconds; + +public static Deadline fromMonotonicNanoseconds( +long nanoseconds +) { +return new Deadline(nanoseconds); +} + +public static Deadline fromDelay( +Time time, +long delay, +TimeUnit timeUnit +) { +return fromDelay(time.nanoseconds(), delay, timeUnit); +} + +public static Deadline fromDelay( Review Comment: Can this be private? Or if it needs to be accessible, maybe we can make it `fromDelayNs` ## clients/src/main/java/org/apache/kafka/common/utils/Time.java: ## @@ -86,4 +89,30 @@ default Timer timer(Duration timeout) { return timer(timeout.toMillis()); } +/** + * Wait for a future to complete, or time out. + * + * @param futureThe future to wait for. + * @param deadlineNsThe time in the future, in monotonic nanoseconds, to time out. + * @return The result of the future. + * @paramThe type of the future. + */ +default T waitForFuture( +CompletableFuture future, +long deadlineNs Review Comment: Does it make sense to use `Deadline` here? ## core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala: ## @@ -295,6 +295,8 @@ abstract class QuorumTestHarness extends Logging { throw new RuntimeException("Only one KRaft controller is supported for now.") } val props = propsList(0) +props.setProperty(KafkaConfig.BrokerServerMaxStartupTimeMs, TimeUnit.MINUTES.toMillis(10).toString) Review Comment: Seems like we need to update these references? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13153: MINOR: startup timeouts for KRaft integration tests
cmccabe commented on code in PR #13153: URL: https://github.com/apache/kafka/pull/13153#discussion_r1089352604 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1153,6 +1157,8 @@ object KafkaConfig { .define(MetadataMaxRetentionBytesProp, LONG, Defaults.MetadataMaxRetentionBytes, null, HIGH, MetadataMaxRetentionBytesDoc) .define(MetadataMaxRetentionMillisProp, LONG, LogConfig.DEFAULT_RETENTION_MS, null, HIGH, MetadataMaxRetentionMillisDoc) .define(MetadataMaxIdleIntervalMsProp, INT, Defaults.MetadataMaxIdleIntervalMs, atLeast(0), LOW, MetadataMaxIdleIntervalMsDoc) + .defineInternal(BrokerServerMaxStartupTimeMs, LONG, Defaults.BrokerServerMaxStartupTimeMs, atLeast(0), MEDIUM, BrokerServerMaxStartupTimeMsDoc) Review Comment: yeah ## server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java: ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.math.BigInteger; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + + +public class FutureUtils { +/** + * Based on the current time and a delay, computes a monotonic deadline in the future. + * + * @param nowNs The current time in monotonic nanoseconds. + * @param delayMs The delay in milliseconds. + * @return The monotonic deadline in the future. This value is capped at + * Long.MAX_VALUE. + */ +public static long getDeadlineNsFromDelayMs( +long nowNs, +long delayMs +) { +if (delayMs < 0) { +throw new RuntimeException("Negative delays are not allowed."); +} +BigInteger delayNs = BigInteger.valueOf(delayMs).multiply(BigInteger.valueOf(1_000_000)); +BigInteger deadlineNs = BigInteger.valueOf(nowNs).add(delayNs); +if (deadlineNs.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) >= 0) { +return Long.MAX_VALUE; +} else { +return deadlineNs.longValue(); +} +} + +/** + * Wait for a future until a specific time in the future, with copious logging. + * + * @param log The slf4j object to use to log success and failure. + * @param actionThe action we are waiting for. + * @param futureThe future we are waiting for. + * @param deadlineNsThe deadline in the future we are waiting for. + * @param time The clock object. + * @return The result of the future. + * @paramThe type of the future. + * + * @throws java.util.concurrent.TimeoutException If the future times out. + * @throws Throwable If the future fails. Note: we unwrap ExecutionException here. + */ +public static T waitWithLogging( +Logger log, +String action, +CompletableFuture future, +long deadlineNs, Review Comment: I will create a Deadline class to help with this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13153: MINOR: startup timeouts for KRaft integration tests
cmccabe commented on code in PR #13153: URL: https://github.com/apache/kafka/pull/13153#discussion_r1089313497 ## clients/src/main/java/org/apache/kafka/common/utils/Time.java: ## @@ -86,4 +89,30 @@ default Timer timer(Duration timeout) { return timer(timeout.toMillis()); } +/** + * Wait for a future to complete, or time out. + * + * @param futureThe future to wait for. + * @param deadlineNsThe time in the future, in monotonic nanoseconds, to time out. + * @return The result of the future. + * @paramThe type of the future. + */ +default T waitForFuture( +CompletableFuture future, +long deadlineNs +) throws TimeoutException, InterruptedException, ExecutionException { +TimeoutException timeoutException = null; +while (true) { Review Comment: It's a bit unclear. It's traditional for condition variables to allow spurious wakeups, and probably this is implemented with a condition variable. The JavaDoc for `get` just says that it will wait "at most the given time" which doesn't really clarify matters. In any case, the loop certainly does no harm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13153: MINOR: startup timeouts for KRaft integration tests
hachikuji commented on code in PR #13153: URL: https://github.com/apache/kafka/pull/13153#discussion_r1089298321 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1153,6 +1157,8 @@ object KafkaConfig { .define(MetadataMaxRetentionBytesProp, LONG, Defaults.MetadataMaxRetentionBytes, null, HIGH, MetadataMaxRetentionBytesDoc) .define(MetadataMaxRetentionMillisProp, LONG, LogConfig.DEFAULT_RETENTION_MS, null, HIGH, MetadataMaxRetentionMillisDoc) .define(MetadataMaxIdleIntervalMsProp, INT, Defaults.MetadataMaxIdleIntervalMs, atLeast(0), LOW, MetadataMaxIdleIntervalMsDoc) + .defineInternal(BrokerServerMaxStartupTimeMs, LONG, Defaults.BrokerServerMaxStartupTimeMs, atLeast(0), MEDIUM, BrokerServerMaxStartupTimeMsDoc) Review Comment: Seems like we could get away with one internal config. Maybe `process.max.startup.time.ms`? ## server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java: ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.math.BigInteger; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + + +public class FutureUtils { +/** + * Based on the current time and a delay, computes a monotonic deadline in the future. + * + * @param nowNs The current time in monotonic nanoseconds. + * @param delayMs The delay in milliseconds. + * @return The monotonic deadline in the future. This value is capped at + * Long.MAX_VALUE. + */ +public static long getDeadlineNsFromDelayMs( +long nowNs, +long delayMs +) { +if (delayMs < 0) { +throw new RuntimeException("Negative delays are not allowed."); +} +BigInteger delayNs = BigInteger.valueOf(delayMs).multiply(BigInteger.valueOf(1_000_000)); +BigInteger deadlineNs = BigInteger.valueOf(nowNs).add(delayNs); +if (deadlineNs.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) >= 0) { +return Long.MAX_VALUE; +} else { +return deadlineNs.longValue(); +} +} + +/** + * Wait for a future until a specific time in the future, with copious logging. + * + * @param log The slf4j object to use to log success and failure. + * @param actionThe action we are waiting for. + * @param futureThe future we are waiting for. + * @param deadlineNsThe deadline in the future we are waiting for. + * @param time The clock object. + * @return The result of the future. + * @paramThe type of the future. + * + * @throws java.util.concurrent.TimeoutException If the future times out. + * @throws Throwable If the future fails. Note: we unwrap ExecutionException here. + */ +public static T waitWithLogging( +Logger log, +String action, +CompletableFuture future, +long deadlineNs, Review Comment: Wonder if we could use a `Duration` object or something like that to avoid incorrect unit usage. ## clients/src/main/java/org/apache/kafka/common/utils/Time.java: ## @@ -86,4 +89,30 @@ default Timer timer(Duration timeout) { return timer(timeout.toMillis()); } +/** + * Wait for a future to complete, or time out. + * + * @param futureThe future to wait for. + * @param deadlineNsThe time in the future, in monotonic nanoseconds, to time out. + * @return The result of the future. + * @paramThe type of the future. + */ +default T waitForFuture( +CompletableFuture future, +long deadlineNs +) throws TimeoutException, InterruptedException, ExecutionException { +TimeoutException timeoutException = null; +while (true) { Review Comment: Why is the loop needed? Does `future.get` not respect the timeout? -- This is an automated message from the Apache Git
[jira] [Updated] (KAFKA-14658) When listening on fixed ports, defer port opening until we're ready
[ https://issues.apache.org/jira/browse/KAFKA-14658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-14658: - Description: When we are listening on fixed ports, we should defer opening ports until we're ready to accept traffic. If we open the broker port too early, it can confuse monitoring and deployment systems. This is a particular concern when in KRaft mode, since in that mode, we create the SocketServer object earlier in the startup process than when in ZK mode. The approach taken in this PR is to defer opening the acceptor port until Acceptor.start is called. Note that when we are listening on a random port, we continue to open the port "early," in the SocketServer constructor. The reason for doing this is that there is no other way to find the random port number the kernel has selected. Since random port assignment is not used in production deployments, this should be reasonable. was: We should not open the ports on the broker until we are ready to accept traffic. This is a particular concern when in KRaft mode, since in that mode, we create the SocketServer object earlier in the startup process than when in ZK mode. The approach taken in this PR is to defer opening the acceptor port until Acceptor.start is called. There is one exception to this rule, however: when we are binding to a random port (that is, binding to "port 0"), we open the port > When listening on fixed ports, defer port opening until we're ready > --- > > Key: KAFKA-14658 > URL: https://issues.apache.org/jira/browse/KAFKA-14658 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > > When we are listening on fixed ports, we should defer opening ports until > we're ready to accept traffic. If we open the broker port too early, it can > confuse monitoring and deployment systems. This is a particular concern when > in KRaft mode, since in that mode, we create the SocketServer object earlier > in the startup process than when in ZK mode. > The approach taken in this PR is to defer opening the acceptor port until > Acceptor.start is called. Note that when we are listening on a random port, > we continue to open the port "early," in the SocketServer constructor. The > reason for doing this is that there is no other way to find the random port > number the kernel has selected. Since random port assignment is not used in > production deployments, this should be reasonable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14658) Do not open broker ports until we are ready to accept traffic
[ https://issues.apache.org/jira/browse/KAFKA-14658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-14658: - Description: We should not open the ports on the broker until we are ready to accept traffic. This is a particular concern when in KRaft mode, since in that mode, we create the SocketServer object earlier in the startup process than when in ZK mode. The approach taken in this PR is to defer opening the acceptor port until Acceptor.start is called. There is one exception to this rule, however: when we are binding to a random port (that is, binding to "port 0"), we open the port was:We should not open the ports on the broker until we are ready to accept traffic. This is a particular concern when in KRaft mode, since in that mode, we create the SocketServer object earlier in the startup process than when in ZK mode. > Do not open broker ports until we are ready to accept traffic > - > > Key: KAFKA-14658 > URL: https://issues.apache.org/jira/browse/KAFKA-14658 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > > We should not open the ports on the broker until we are ready to accept > traffic. This is a particular concern when in KRaft mode, since in that mode, > we create the SocketServer object earlier in the startup process than when in > ZK mode. > The approach taken in this PR is to defer opening the acceptor port until > Acceptor.start is called. There is one exception to this rule, however: when > we are binding to a random port (that is, binding to "port 0"), we open the > port -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14658) When listening on fixed ports, defer port opening until we're ready
[ https://issues.apache.org/jira/browse/KAFKA-14658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-14658: - Summary: When listening on fixed ports, defer port opening until we're ready (was: Do not open broker ports until we are ready to accept traffic) > When listening on fixed ports, defer port opening until we're ready > --- > > Key: KAFKA-14658 > URL: https://issues.apache.org/jira/browse/KAFKA-14658 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > > We should not open the ports on the broker until we are ready to accept > traffic. This is a particular concern when in KRaft mode, since in that mode, > we create the SocketServer object earlier in the startup process than when in > ZK mode. > The approach taken in this PR is to defer opening the acceptor port until > Acceptor.start is called. There is one exception to this rule, however: when > we are binding to a random port (that is, binding to "port 0"), we open the > port -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14658) Do not open broker ports until we are ready to accept traffic
Colin McCabe created KAFKA-14658: Summary: Do not open broker ports until we are ready to accept traffic Key: KAFKA-14658 URL: https://issues.apache.org/jira/browse/KAFKA-14658 Project: Kafka Issue Type: Bug Reporter: Colin McCabe Assignee: Colin McCabe We should not open the ports on the broker until we are ready to accept traffic. This is a particular concern when in KRaft mode, since in that mode, we create the SocketServer object earlier in the startup process than when in ZK mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: Port opening fix
cmccabe commented on code in PR #13169: URL: https://github.com/apache/kafka/pull/13169#discussion_r1089236363 ## server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java: ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.math.BigInteger; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; + + +public class FutureUtils { +/** + * Based on the current time and a delay, computes a monotonic deadline in the future. + * + * @param nowNs The current time in monotonic nanoseconds. + * @param delayMs The delay in milliseconds. + * @return The monotonic deadline in the future. This value is capped at + * Long.MAX_VALUE. + */ +public static long getDeadlineNsFromDelayMs( +long nowNs, +long delayMs +) { +if (delayMs < 0) { +throw new RuntimeException("Negative delays are not allowed."); +} +BigInteger delayNs = BigInteger.valueOf(delayMs).multiply(BigInteger.valueOf(1_000_000)); Review Comment: two reasons `System.nanotime` may return negative values. Subtracting a negative value from the largest possible 64-bit integer will result in overflow if done with 64-bit math. Additionally, of course, the configuraiton key is in milliseconds, whereas we care about nanoseconds, so we would immediately overflow without using `BigInteger` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: Port opening fix
cmccabe commented on code in PR #13169: URL: https://github.com/apache/kafka/pull/13169#discussion_r1089233294 ## clients/src/main/java/org/apache/kafka/common/utils/Time.java: ## @@ -86,4 +89,30 @@ default Timer timer(Duration timeout) { return timer(timeout.toMillis()); } +/** + * Wait for a future to complete, or time out. + * + * @param futureThe future to wait for. + * @param deadlineNsThe time in the future, in monotonic nanoseconds, to time out. + * @return The result of the future. + * @paramThe type of the future. + */ +default T waitForFuture( +CompletableFuture future, +long deadlineNs +) throws TimeoutException, InterruptedException, ExecutionException { +TimeoutException timeoutException = null; +while (true) { +long nowNs = nanoseconds(); +if (deadlineNs <= nowNs) { +throw (timeoutException == null) ? new TimeoutException() : timeoutException; +} +long deltaNs = deadlineNs - nowNs; +try { +return future.get(deltaNs, TimeUnit.NANOSECONDS); Review Comment: it will not work if we are already past the deadline -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14657) Admin.fenceProducers fails when Producer has ongoing transaction - but Producer gets fenced
[ https://issues.apache.org/jira/browse/KAFKA-14657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681416#comment-17681416 ] Justine Olshan commented on KAFKA-14657: Is the issue here that it appears the command failed? The fencing actually taking place is fine, its just unclear it worked because of the error? > Admin.fenceProducers fails when Producer has ongoing transaction - but > Producer gets fenced > --- > > Key: KAFKA-14657 > URL: https://issues.apache.org/jira/browse/KAFKA-14657 > Project: Kafka > Issue Type: Bug > Components: admin >Reporter: Edoardo Comar >Priority: Major > Attachments: FenceProducerDuringTx.java, FenceProducerOutsideTx.java > > > Admin.fenceProducers() > fails with a ConcurrentTransactionsException if invoked when a Producer has a > transaction ongoing. > However, further attempts by that producer to produce fail with > InvalidProducerEpochException and the producer is not re-usable, > cannot abort/commit as it is fenced. > An InvalidProducerEpochException is also logged as error on the broker > [2023-01-27 17:16:32,220] ERROR [ReplicaManager broker=1] Error processing > append operation on partition topic-0 (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.InvalidProducerEpochException: Epoch of > producer 1062 at offset 84 in topic-0 is 0, which is smaller than the last > seen epoch > > Conversely, if Admin.fenceProducers() > is invoked while there is no open transaction, the call succeeds and further > attempts by that producer to produce fail with ProducerFenced. > see attached snippets > As the caller of Admin.fenceProducers() is likely unaware of the producers > state, the call should succeed regardless -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison opened a new pull request, #13170: MINOR: Remove unused methods in CoreUtils
mimaison opened a new pull request, #13170: URL: https://github.com/apache/kafka/pull/13170 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14657) Admin.fenceProducers fails when Producer has ongoing transaction - but Producer gets fenced
[ https://issues.apache.org/jira/browse/KAFKA-14657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-14657: -- Description: Admin.fenceProducers() fails with a ConcurrentTransactionsException if invoked when a Producer has a transaction ongoing. However, further attempts by that producer to produce fail with InvalidProducerEpochException and the producer is not re-usable, cannot abort/commit as it is fenced. An InvalidProducerEpochException is also logged as error on the broker [2023-01-27 17:16:32,220] ERROR [ReplicaManager broker=1] Error processing append operation on partition topic-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.InvalidProducerEpochException: Epoch of producer 1062 at offset 84 in topic-0 is 0, which is smaller than the last seen epoch Conversely, if Admin.fenceProducers() is invoked while there is no open transaction, the call succeeds and further attempts by that producer to produce fail with ProducerFenced. see attached snippets As the caller of Admin.fenceProducers() is likely unaware of the producers state, the call should succeed regardless was: Admin.fenceProducers() fails with a ConcurrentTransactionsException if invoked when a Producer has a transaction ongoing. However, further attempts by that producer to produce fail with InvalidProducerEpochException and the producer is not re-usable, cannot abort/commit as it is fenced. Conversely, if Admin.fenceProducers() is invoked while there is no open transaction, the call succeeds and further attempts by that producer to produce fail with ProducerFenced. see attached snippets As the caller of Admin.fenceProducers() is likely unaware of the producers state, the call should succeed regardless > Admin.fenceProducers fails when Producer has ongoing transaction - but > Producer gets fenced > --- > > Key: KAFKA-14657 > URL: https://issues.apache.org/jira/browse/KAFKA-14657 > Project: Kafka > Issue Type: Bug > Components: admin >Reporter: Edoardo Comar >Priority: Major > Attachments: FenceProducerDuringTx.java, FenceProducerOutsideTx.java > > > Admin.fenceProducers() > fails with a ConcurrentTransactionsException if invoked when a Producer has a > transaction ongoing. > However, further attempts by that producer to produce fail with > InvalidProducerEpochException and the producer is not re-usable, > cannot abort/commit as it is fenced. > An InvalidProducerEpochException is also logged as error on the broker > [2023-01-27 17:16:32,220] ERROR [ReplicaManager broker=1] Error processing > append operation on partition topic-0 (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.InvalidProducerEpochException: Epoch of > producer 1062 at offset 84 in topic-0 is 0, which is smaller than the last > seen epoch > > Conversely, if Admin.fenceProducers() > is invoked while there is no open transaction, the call succeeds and further > attempts by that producer to produce fail with ProducerFenced. > see attached snippets > As the caller of Admin.fenceProducers() is likely unaware of the producers > state, the call should succeed regardless -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14657) Admin.fenceProducers fails when Producer has ongoing transaction - but Producer gets fenced
[ https://issues.apache.org/jira/browse/KAFKA-14657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-14657: -- Description: Admin.fenceProducers() fails with a ConcurrentTransactionsException if invoked when a Producer has a transaction ongoing. However, further attempts by that producer to produce fail with InvalidProducerEpochException and the producer is not re-usable, cannot abort/commit as it is fenced. Conversely, if Admin.fenceProducers() is invoked while there is no open transaction, the call succeeds and further attempts by that producer to produce fail with ProducerFenced. see attached snippets As the caller of Admin.fenceProducers() is likely unaware of the producers state, the call should succeed regardless was: {{Admin.fenceProducers() }} fails with a ConcurrentTransactionsException if invoked when a Producer has a transaction ongoing. However, further attempts by that producer to produce fail with InvalidProducerEpochException and the producer is not re-usable, cannot abort/commit as it is fenced. Conversely, if {{Admin.fenceProducers() }} is invoked while there is no open transaction, the call succeeds and further attempts by that producer to produce fail with ProducerFenced. see attached snippets As the caller of {{Admin.fenceProducers() }} the call should succeed regardless of the state of the producer > Admin.fenceProducers fails when Producer has ongoing transaction - but > Producer gets fenced > --- > > Key: KAFKA-14657 > URL: https://issues.apache.org/jira/browse/KAFKA-14657 > Project: Kafka > Issue Type: Bug > Components: admin >Reporter: Edoardo Comar >Priority: Major > Attachments: FenceProducerDuringTx.java, FenceProducerOutsideTx.java > > > Admin.fenceProducers() > fails with a ConcurrentTransactionsException if invoked when a Producer has a > transaction ongoing. > However, further attempts by that producer to produce fail with > InvalidProducerEpochException and the producer is not re-usable, > cannot abort/commit as it is fenced. > Conversely, if Admin.fenceProducers() > is invoked while there is no open transaction, the call succeeds and further > attempts by that producer to produce fail with ProducerFenced. > see attached snippets > As the caller of Admin.fenceProducers() is likely unaware of the producers > state, the call should succeed regardless -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14657) Admin.fenceProducers fails when Producer has ongoing transaction - but Producer gets fenced
Edoardo Comar created KAFKA-14657: - Summary: Admin.fenceProducers fails when Producer has ongoing transaction - but Producer gets fenced Key: KAFKA-14657 URL: https://issues.apache.org/jira/browse/KAFKA-14657 Project: Kafka Issue Type: Bug Components: admin Reporter: Edoardo Comar Attachments: FenceProducerDuringTx.java, FenceProducerOutsideTx.java {{Admin.fenceProducers() }} fails with a ConcurrentTransactionsException if invoked when a Producer has a transaction ongoing. However, further attempts by that producer to produce fail with InvalidProducerEpochException and the producer is not re-usable, cannot abort/commit as it is fenced. Conversely, if {{Admin.fenceProducers() }} is invoked while there is no open transaction, the call succeeds and further attempts by that producer to produce fail with ProducerFenced. see attached snippets As the caller of {{Admin.fenceProducers() }} the call should succeed regardless of the state of the producer -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-5613) Deprecate JmxTool?
[ https://issues.apache.org/jira/browse/KAFKA-5613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681405#comment-17681405 ] Federico Valeri commented on KAFKA-5613: FYI, there is an open PR to migrate/convert the JmxTool as part of KAFKA-14525. [https://github.com/apache/kafka/pull/13136] > Deprecate JmxTool? > -- > > Key: KAFKA-5613 > URL: https://issues.apache.org/jira/browse/KAFKA-5613 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Ewen Cheslack-Postava >Priority: Major > > According to git-blame, JmxTool has been around since October 2011. We use it > in system tests, but we are thinking it might be best to replace it: > https://issues.apache.org/jira/browse/KAFKA-5612 > When making modifications for system tests, we've had to take into account > compatibility because this tool is technically included in our distribution > and, perhaps unintentionally, a public utility. > We know that "real" tools for JMX, like jmxtrans, are more commonly used, but > we don't know who might be using JmxTool simply because it ships with Kafka. > That said, it also isn't documented in the Kafka documentation, so you > probably have to dig around to find it. > Hopefully we can deprecate this and eventually move it either to a jar that > is only used for system tests, or even better just remove it entirely. To do > any of this, we'd probably need to do at least a cursory survey of the > community to get a feel for usage level. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14615) Use sizeCompare(Iterable[_]) to compare two iterables
[ https://issues.apache.org/jira/browse/KAFKA-14615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681404#comment-17681404 ] Federico Valeri edited comment on KAFKA-14615 at 1/27/23 4:49 PM: -- FYI, there is an open PR to migrate/convert the JmxTool as part of KAFKA-14525. [https://github.com/apache/kafka/pull/13136] was (Author: fvaleri): FYI, there is an open PR to migrate/convert the JmxTool: https://github.com/apache/kafka/pull/13136 > Use sizeCompare(Iterable[_]) to compare two iterables > - > > Key: KAFKA-14615 > URL: https://issues.apache.org/jira/browse/KAFKA-14615 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Assignee: Kamalesh Palanisamy >Priority: Minor > Labels: Newbie, newbie > Fix For: 4.0.0 > > > Since Scala 2.12 is being deprecated in 4.x version, we can utilize some > improved methods for comparing size of scala collections which were > introduced starting 2.13. > This task is to find and replace usage of code paths where we use > (IterableA.size == IterableB.size) having a complexity of O(IterableA size + > IterableB size) with sizeCompare() method which has a complexity of > O(min(IterableA size, IterableB size)) > Some examples where sizeCompare() could be used are: > 1. > [https://github.com/apache/kafka/blob/78d4458b94e585bc602a4ae307d3de54fcedf2af/core/src/main/scala/kafka/server/KafkaApis.scala#L1177] > 2. > [https://github.com/apache/kafka/blob/78d4458b94e585bc602a4ae307d3de54fcedf2af/core/src/main/scala/kafka/tools/JmxTool.scala#L215] > > 3. > [1] [https://github.com/scala/scala/pull/6758] > [2] [https://github.com/scala/scala/pull/6950] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14615) Use sizeCompare(Iterable[_]) to compare two iterables
[ https://issues.apache.org/jira/browse/KAFKA-14615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681404#comment-17681404 ] Federico Valeri commented on KAFKA-14615: - FYI, there is an open PR to migrate/convert the JmxTool: https://github.com/apache/kafka/pull/13136 > Use sizeCompare(Iterable[_]) to compare two iterables > - > > Key: KAFKA-14615 > URL: https://issues.apache.org/jira/browse/KAFKA-14615 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Assignee: Kamalesh Palanisamy >Priority: Minor > Labels: Newbie, newbie > Fix For: 4.0.0 > > > Since Scala 2.12 is being deprecated in 4.x version, we can utilize some > improved methods for comparing size of scala collections which were > introduced starting 2.13. > This task is to find and replace usage of code paths where we use > (IterableA.size == IterableB.size) having a complexity of O(IterableA size + > IterableB size) with sizeCompare() method which has a complexity of > O(min(IterableA size, IterableB size)) > Some examples where sizeCompare() could be used are: > 1. > [https://github.com/apache/kafka/blob/78d4458b94e585bc602a4ae307d3de54fcedf2af/core/src/main/scala/kafka/server/KafkaApis.scala#L1177] > 2. > [https://github.com/apache/kafka/blob/78d4458b94e585bc602a4ae307d3de54fcedf2af/core/src/main/scala/kafka/tools/JmxTool.scala#L215] > > 3. > [1] [https://github.com/scala/scala/pull/6758] > [2] [https://github.com/scala/scala/pull/6950] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-7666) KIP-391: Allow Producing with Offsets for Cluster Replication
[ https://issues.apache.org/jira/browse/KAFKA-7666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar resolved KAFKA-7666. -- Resolution: Won't Fix KIP has been retired > KIP-391: Allow Producing with Offsets for Cluster Replication > - > > Key: KAFKA-7666 > URL: https://issues.apache.org/jira/browse/KAFKA-7666 > Project: Kafka > Issue Type: New Feature >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > > Implementing KIP-391 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-391%3A+Allow+Producing+with+Offsets+for+Cluster+Replication -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals
dpcollins-google commented on code in PR #13162: URL: https://github.com/apache/kafka/pull/13162#discussion_r1089108841 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel, * @param length The number of bytes to write * @throws IOException For any errors writing to the output */ -public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException { +public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException { if (buffer.hasArray()) { out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length); } else { -int pos = buffer.position(); -for (int i = pos; i < length + pos; i++) -out.writeByte(buffer.get(i)); +Channels.newChannel(out).write(buffer); Review Comment: 1) Yes, per the docs of WritableByteChannel ``` Writes a sequence of bytes to this channel from the given buffer. An attempt is made to write up to r bytes to the channel, where r is the number of bytes remaining in the buffer, that is, src.remaining(), at the moment this method is invoked. Suppose that a byte sequence of length n is written, where 0 <= n <= r. This byte sequence will be transferred from the buffer starting at index p, where p is the buffer's position at the moment this method is invoked; the index of the last byte written will be p + n - 1. Upon return the buffer's position will be equal to p + n; its limit will not have changed. ``` 2) Good point, although I don't think this has an effect on any of the 4 current users (DefaultRecord.writeTo and LegacyRecord.writeTo for writing key and value), I've added a defensive call to asReadOnlyBuffer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14584) Move StateChangeLogMerger to tools
[ https://issues.apache.org/jira/browse/KAFKA-14584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri reassigned KAFKA-14584: --- Assignee: Federico Valeri > Move StateChangeLogMerger to tools > -- > > Key: KAFKA-14584 > URL: https://issues.apache.org/jira/browse/KAFKA-14584 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Federico Valeri >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Hangleton commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals
Hangleton commented on code in PR #13162: URL: https://github.com/apache/kafka/pull/13162#discussion_r1088967227 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel, * @param length The number of bytes to write * @throws IOException For any errors writing to the output */ -public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException { +public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException { if (buffer.hasArray()) { out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length); } else { -int pos = buffer.position(); -for (int i = pos; i < length + pos; i++) -out.writeByte(buffer.get(i)); +Channels.newChannel(out).write(buffer); Review Comment: - Is this preserving the same behaviour i.e. copy in the range `[pos, pos + length]`? - This is mutating the position of the `buffer`, as opposed to the current implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14591) Move DeleteRecordsCommand to tools
[ https://issues.apache.org/jira/browse/KAFKA-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison reassigned KAFKA-14591: -- Assignee: Mickael Maison > Move DeleteRecordsCommand to tools > -- > > Key: KAFKA-14591 > URL: https://issues.apache.org/jira/browse/KAFKA-14591 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-9975) KIP-611: Improved Handling of Abandoned Connectors and Tasks
[ https://issues.apache.org/jira/browse/KAFKA-9975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681320#comment-17681320 ] Sayed Mohammad Hossein Torabi commented on KAFKA-9975: -- I would like to work on this KIP > KIP-611: Improved Handling of Abandoned Connectors and Tasks > > > Key: KAFKA-9975 > URL: https://issues.apache.org/jira/browse/KAFKA-9975 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Chris Egerton >Priority: Major > Labels: needs-kip > > (To be fleshed out once > [KIP-611|https://cwiki.apache.org/confluence/display/KAFKA/KIP-611%3A+Improved+Handling+of+Abandoned+Connectors+and+Tasks] > is finalized) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Cerchie commented on a diff in pull request #13161: Kafka 14128
Cerchie commented on code in PR #13161: URL: https://github.com/apache/kafka/pull/13161#discussion_r1088951633 ## clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java: ## @@ -160,7 +160,7 @@ private void maybeThrowCancellationException(Throwable cause) { * Waits if necessary for this future to complete, and then returns its result. */ @Override -public T get() throws InterruptedException, ExecutionException { +public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { Review Comment: That makes sense! I think this change might be in here by accident -- I thought I had done that. Thanks for pointing that out. I'll remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13169: Port opening fix
Hangleton commented on code in PR #13169: URL: https://github.com/apache/kafka/pull/13169#discussion_r1088918527 ## clients/src/main/java/org/apache/kafka/common/utils/Time.java: ## @@ -86,4 +89,30 @@ default Timer timer(Duration timeout) { return timer(timeout.toMillis()); } +/** + * Wait for a future to complete, or time out. + * + * @param futureThe future to wait for. + * @param deadlineNsThe time in the future, in monotonic nanoseconds, to time out. + * @return The result of the future. + * @paramThe type of the future. + */ +default T waitForFuture( +CompletableFuture future, +long deadlineNs +) throws TimeoutException, InterruptedException, ExecutionException { +TimeoutException timeoutException = null; +while (true) { +long nowNs = nanoseconds(); +if (deadlineNs <= nowNs) { +throw (timeoutException == null) ? new TimeoutException() : timeoutException; +} +long deltaNs = deadlineNs - nowNs; +try { +return future.get(deltaNs, TimeUnit.NANOSECONDS); Review Comment: Naive question - what is incorrect with ``` default T waitForFuture( CompletableFuture future, long deadlineNs ) throws TimeoutException, InterruptedException, ExecutionException { return future.get(Math.max(0, deadlineNs - nanoseconds())) } ``` ## server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java: ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.math.BigInteger; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; + + +public class FutureUtils { +/** + * Based on the current time and a delay, computes a monotonic deadline in the future. + * + * @param nowNs The current time in monotonic nanoseconds. + * @param delayMs The delay in milliseconds. + * @return The monotonic deadline in the future. This value is capped at + * Long.MAX_VALUE. + */ +public static long getDeadlineNsFromDelayMs( +long nowNs, +long delayMs +) { +if (delayMs < 0) { +throw new RuntimeException("Negative delays are not allowed."); +} +BigInteger delayNs = BigInteger.valueOf(delayMs).multiply(BigInteger.valueOf(1_000_000)); Review Comment: Curious - which use cases result in overflow of the long values in this context? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13161: Kafka 14128
Hangleton commented on code in PR #13161: URL: https://github.com/apache/kafka/pull/13161#discussion_r1088898259 ## clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java: ## @@ -160,7 +160,7 @@ private void maybeThrowCancellationException(Throwable cause) { * Waits if necessary for this future to complete, and then returns its result. */ @Override -public T get() throws InterruptedException, ExecutionException { +public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { Review Comment: Apologies if I misread, why not use the method `get` with timeout line 177? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on pull request #13136: KAFKA-14582: Move JmxTool to tools
fvaleri commented on PR #13136: URL: https://github.com/apache/kafka/pull/13136#issuecomment-1406316345 @mimaison I've rebased and added some tests. This is now ready for another review. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13060: KAFKA-14559: Fix JMX tool to handle the object names with wild cards and optional attributes
mimaison commented on PR #13060: URL: https://github.com/apache/kafka/pull/13060#issuecomment-1406261073 Thanks for the PR! We're moving JmxTool to the tools module in https://github.com/apache/kafka/pull/13136. Let's do the rewrite/move first, then I'll take a look at this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org