[GitHub] [kafka] vamossagar12 commented on pull request #13127: KAFKA-14586: Moving StreamResetter to tools

2023-01-27 Thread via GitHub


vamossagar12 commented on PR #13127:
URL: https://github.com/apache/kafka/pull/13127#issuecomment-1407272563

   > @vamossagar12 Thanks for the PR!
   > 
   > I had one major comment.
   
   Thanks @cadonna . I think that comment has been addressed now based on the 
conversation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13127: KAFKA-14586: Moving StreamResetter to tools

2023-01-27 Thread via GitHub


vamossagar12 commented on code in PR #13127:
URL: https://github.com/apache/kafka/pull/13127#discussion_r1089609015


##
build.gradle:
##
@@ -1757,6 +1757,7 @@ project(':tools') {
   archivesBaseName = "kafka-tools"
 
   dependencies {
+implementation project(':core')

Review Comment:
   hi @cadonna , The PR for moving `CommandLineUtils` has been [merged to 
trunk](https://github.com/apache/kafka/pull/13131), meaning I don't need to 
include this dependency anymore. I have updated the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-27 Thread via GitHub


vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1407268172

   > EndToEndLatencyService
   
   Thanks @mimaison . Actually I lack some context here. The class, 
`TestEndToEndLatency` was renamed to `EndToEndLatency` with this very old PR: 
https://github.com/apache/kafka/commit/e43c9aff92c57da6abb0c1d0af3431a550110a89#diff-52dbfa7ab683a53b91a84c35f309b56ff1b2a1cd94e4ccb86c5843e9e44a050f
 and in a subsequent PR, the support for zk_connect was removed. So, the line 
you highlighted above mayn't need to be changed. Also, there's no reference of 
`TestEndToEndLatency` in the project anymore in trunk so I am assuming it would 
be shipped with older version (< 0.9) maybe?
   
   Were you referring to this line instead? 
https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/performance/end_to_end_latency.py#L127
   which is invoked on line #91? I agree, that needs to be changed. Plz let me 
know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14659) source-record-write-[rate|total] metrics include filtered records

2023-01-27 Thread Chris Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Beard updated KAFKA-14659:

Priority: Minor  (was: Major)

> source-record-write-[rate|total] metrics include filtered records
> -
>
> Key: KAFKA-14659
> URL: https://issues.apache.org/jira/browse/KAFKA-14659
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Beard
>Priority: Minor
>
> Source tasks in Kafka connect offer two sets of metrics (documented in 
> [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]):
> ||Metric||Description||
> |source-record-poll-rate|The average per-second number of records 
> produced/polled (before transformation) by this task belonging to the named 
> source connector in this worker.|
> |source-record-write-rate|The average per-second number of records output 
> from the transformations and written to Kafka for this task belonging to the 
> named source connector in this worker. This is after transformations are 
> applied and excludes any records filtered out by the transformations.|
> There are also corresponding "-total" metrics that capture the total number 
> of records polled and written for the metrics above, respectively.
> In short, the "poll" metrics capture the number of messages sourced 
> pre-transformation/filtering, and the "write" metrics should capture the 
> number of messages ultimately written to Kafka post-transformation/filtering. 
> However, the implementation of the {{source-record-write-*}}  metrics 
> _includes_ records filtered out by transformations (and also records that 
> result in produce failures with the config {{{}errors.tolerance=all{}}}).
> h3. Details
> In 
> [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397],
>  each source record is passed through the transformation chain where it is 
> potentially filtered out, checked to see if it was in fact filtered out, and 
> if so it is accounted for in the internal metrics via 
> {{{}counter.skipRecord(){}}}.
> {code:java}
> for (final SourceRecord preTransformRecord : toSend) { 
> retryWithToleranceOperator.sourceRecord(preTransformRecord);
> final SourceRecord record = 
> transformationChain.apply(preTransformRecord);
> final ProducerRecord producerRecord = 
> convertTransformedRecord(record);
> if (producerRecord == null || retryWithToleranceOperator.failed()) {  
>   
> counter.skipRecord();
> recordDropped(preTransformRecord);
> continue;
> }
> ...
> {code}
> {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows:
> {code:java}
> 
> public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup 
> metricsGroup) {
> assert batchSize > 0;
> assert metricsGroup != null;
> this.batchSize = batchSize;
> counter = batchSize;
> this.metricsGroup = metricsGroup;
> }
> public void skipRecord() {
> if (counter > 0 && --counter == 0) {
> finishedAllWrites();
> }
> }
> 
> private void finishedAllWrites() {
> if (!completed) {
> metricsGroup.recordWrite(batchSize - counter);
> completed = true;
> }
> }
> {code}
> For example: If a batch starts with 100 records, {{batchSize}} and 
> {{counter}} will both be initialized to 100. If all 100 records get filtered 
> out, {{counter}} will be decremented 100 times, and 
> {{{}finishedAllWrites(){}}}will record the value 100 to the underlying 
> {{source-record-write-*}}  metrics rather than 0, the correct value according 
> to the documentation for these metrics.
> h3. Solutions
> Assuming the documentation correctly captures the intent of the 
> {{source-record-write-*}}  metrics, it seems reasonable to fix these metrics 
> such that filtered records do not get counted.
> It may also be useful to add additional metrics to capture the rate and total 
> number of records filtered out by transformations, which would require a KIP.
> I'm not sure what the best way of accounting for produce failures in the case 
> of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own 
> new metrics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12225) Unexpected broker bottleneck when scaling producers

2023-01-27 Thread Chris Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Beard updated KAFKA-12225:

Priority: Major  (was: Minor)

> Unexpected broker bottleneck when scaling producers
> ---
>
> Key: KAFKA-12225
> URL: https://issues.apache.org/jira/browse/KAFKA-12225
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
> Environment: AWS Based
> 5-node cluster running on k8s with EBS attached disks (HDD)
> Kafka Version 2.5.0
> Multiple Producers (KafkaStreams, Akka Streams, golang Sarama)
>Reporter: Harel Ben Attia
>Priority: Major
>
>  
> *TLDR*: There seems to be a major lock contention that can happen on 
> *{{Log.lock}}* during producer-scaling when produce-request sending is 
> time-based ({{linger.ms}}) and not data-size based (max batch size).
> Hi,
> We're running a 5-node Kafka cluster on one of our production systems on AWS. 
> Recently, we have started to notice that as our producer services scale out, 
> the Kafka idle-percentage drops abruptly from ~70% idle percentage to 0% on 
> all brokers, even though none of the physical resources of the brokers are 
> exhausted.
> Initially, we realised that our {{io.thread}} count was too low, causing high 
> request queuing and the low idle percentage, so we have increased it, hoping 
> to see one of the physical resources maxing out. After changing it we still 
> continued to see abrupt drops of the idle-percentage to 0% (with no physical 
> resource maxing out), so we continued to investigate.
> The investigation has shown that there's a direct relation to {{linger.ms}} 
> being the controlling factor of sending out produce requests. Whenever 
> messages are being sent out from the producer due to the {{linger.ms}} 
> threshold, scaling out the service increased the number of produce requests 
> in a way which is not proportional to our traffic increase, bringing down all 
> the brokers to a near-halt in terms of being able to process requests and, as 
> mentioned, without any exhaustion of physical resources.
> After some more experiments and profiling a broker through flight recorder, 
> we have found out that the cause of the issue is a lock contention on a 
> *{{java.lang.Object}}*, wasting a lot of time on all the 
> {{data-plane-kafka-request-handler}} threads. 90% of the locks were on Log's 
> *{{lock: Object}}* instance, inside the *{{Log.append()}}* method. The stack 
> traces show that these locks occur during the {{handleProductRequest}} 
> method. We have ruled out replication as the source of the issues, as there 
> were no replication issues, and the control-plane has a separate thread pool, 
> so this focused us back on the actual producers, leading back to the 
> behaviour of our producer service when scaling out.
> At that point we thought that maybe the issue is related to the number of 
> partitions of the topic (60 currently), and increasing it would reduce the 
> lock contention on each {{Log}} instance, but since each producer writes to 
> all partitions (data is evenly spread and not skewed), then increasing the 
> number of partitions would only cause each producer to generate more 
> produce-requests, not alleviating the lock contention. Also, increasing the 
> number of brokers would increase the idle percentage per broker, but 
> essentially would not help reducing the produce-request latency, since this 
> would not change the rate of produce-requests per Log.
> Eventually, we've worked around the issue by making the {{linger.ms}} value 
> high enough so it stopped being the controlling factor of sending messages 
> (e.g. produce-requests became coupled to the size of the traffic due to the 
> max batch size becoming the controlling factor). This allowed us to utilise 
> the cluster better without upscaling it.
> From our analysis, it seems that this lock behaviour limits Kafka's ability 
> to be robust to producer configuration and scaling, and hurts the ability to 
> do efficient capacity planning for the cluster, increasing the risk of an 
> unexpected bottleneck when traffic increases.
> It would be great if you can validate these conclusions, or provide any more 
> information that will help us understand the issue better or work around it 
> in a more efficient way.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12225) Unexpected broker bottleneck when scaling producers

2023-01-27 Thread Chris Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Beard updated KAFKA-12225:

Priority: Minor  (was: Major)

> Unexpected broker bottleneck when scaling producers
> ---
>
> Key: KAFKA-12225
> URL: https://issues.apache.org/jira/browse/KAFKA-12225
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
> Environment: AWS Based
> 5-node cluster running on k8s with EBS attached disks (HDD)
> Kafka Version 2.5.0
> Multiple Producers (KafkaStreams, Akka Streams, golang Sarama)
>Reporter: Harel Ben Attia
>Priority: Minor
>
>  
> *TLDR*: There seems to be a major lock contention that can happen on 
> *{{Log.lock}}* during producer-scaling when produce-request sending is 
> time-based ({{linger.ms}}) and not data-size based (max batch size).
> Hi,
> We're running a 5-node Kafka cluster on one of our production systems on AWS. 
> Recently, we have started to notice that as our producer services scale out, 
> the Kafka idle-percentage drops abruptly from ~70% idle percentage to 0% on 
> all brokers, even though none of the physical resources of the brokers are 
> exhausted.
> Initially, we realised that our {{io.thread}} count was too low, causing high 
> request queuing and the low idle percentage, so we have increased it, hoping 
> to see one of the physical resources maxing out. After changing it we still 
> continued to see abrupt drops of the idle-percentage to 0% (with no physical 
> resource maxing out), so we continued to investigate.
> The investigation has shown that there's a direct relation to {{linger.ms}} 
> being the controlling factor of sending out produce requests. Whenever 
> messages are being sent out from the producer due to the {{linger.ms}} 
> threshold, scaling out the service increased the number of produce requests 
> in a way which is not proportional to our traffic increase, bringing down all 
> the brokers to a near-halt in terms of being able to process requests and, as 
> mentioned, without any exhaustion of physical resources.
> After some more experiments and profiling a broker through flight recorder, 
> we have found out that the cause of the issue is a lock contention on a 
> *{{java.lang.Object}}*, wasting a lot of time on all the 
> {{data-plane-kafka-request-handler}} threads. 90% of the locks were on Log's 
> *{{lock: Object}}* instance, inside the *{{Log.append()}}* method. The stack 
> traces show that these locks occur during the {{handleProductRequest}} 
> method. We have ruled out replication as the source of the issues, as there 
> were no replication issues, and the control-plane has a separate thread pool, 
> so this focused us back on the actual producers, leading back to the 
> behaviour of our producer service when scaling out.
> At that point we thought that maybe the issue is related to the number of 
> partitions of the topic (60 currently), and increasing it would reduce the 
> lock contention on each {{Log}} instance, but since each producer writes to 
> all partitions (data is evenly spread and not skewed), then increasing the 
> number of partitions would only cause each producer to generate more 
> produce-requests, not alleviating the lock contention. Also, increasing the 
> number of brokers would increase the idle percentage per broker, but 
> essentially would not help reducing the produce-request latency, since this 
> would not change the rate of produce-requests per Log.
> Eventually, we've worked around the issue by making the {{linger.ms}} value 
> high enough so it stopped being the controlling factor of sending messages 
> (e.g. produce-requests became coupled to the size of the traffic due to the 
> max batch size becoming the controlling factor). This allowed us to utilise 
> the cluster better without upscaling it.
> From our analysis, it seems that this lock behaviour limits Kafka's ability 
> to be robust to producer configuration and scaling, and hurts the ability to 
> do efficient capacity planning for the cluster, increasing the risk of an 
> unexpected bottleneck when traffic increases.
> It would be great if you can validate these conclusions, or provide any more 
> information that will help us understand the issue better or work around it 
> in a more efficient way.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14659) source-record-write-[rate|total] metrics include filtered records

2023-01-27 Thread Chris Beard (Jira)
Chris Beard created KAFKA-14659:
---

 Summary: source-record-write-[rate|total] metrics include filtered 
records
 Key: KAFKA-14659
 URL: https://issues.apache.org/jira/browse/KAFKA-14659
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Chris Beard


Source tasks in Kafka connect offer two sets of metrics (documented in 
[ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]):
||Metric||Description||
|source-record-poll-rate|The average per-second number of records 
produced/polled (before transformation) by this task belonging to the named 
source connector in this worker.|
|source-record-write-rate|The average per-second number of records output from 
the transformations and written to Kafka for this task belonging to the named 
source connector in this worker. This is after transformations are applied and 
excludes any records filtered out by the transformations.|

There are also corresponding "-total" metrics that capture the total number of 
records polled and written for the metrics above, respectively.

In short, the "poll" metrics capture the number of messages sourced 
pre-transformation/filtering, and the "write" metrics should capture the number 
of messages ultimately written to Kafka post-transformation/filtering. However, 
the implementation of the {{source-record-write-*}}  metrics _includes_ records 
filtered out by transformations (and also records that result in produce 
failures with the config {{{}errors.tolerance=all{}}}).
h3. Details

In 
[AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397],
 each source record is passed through the transformation chain where it is 
potentially filtered out, checked to see if it was in fact filtered out, and if 
so it is accounted for in the internal metrics via {{{}counter.skipRecord(){}}}.
{code:java}
for (final SourceRecord preTransformRecord : toSend) { 
retryWithToleranceOperator.sourceRecord(preTransformRecord);
final SourceRecord record = transformationChain.apply(preTransformRecord);  
  
final ProducerRecord producerRecord = 
convertTransformedRecord(record);
if (producerRecord == null || retryWithToleranceOperator.failed()) {

counter.skipRecord();
recordDropped(preTransformRecord);
continue;
}
...
{code}
{{SourceRecordWriteCounter.skipRecord()}} is implemented as follows:
{code:java}

public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup 
metricsGroup) {
assert batchSize > 0;
assert metricsGroup != null;
this.batchSize = batchSize;
counter = batchSize;
this.metricsGroup = metricsGroup;
}
public void skipRecord() {
if (counter > 0 && --counter == 0) {
finishedAllWrites();
}
}

private void finishedAllWrites() {
if (!completed) {
metricsGroup.recordWrite(batchSize - counter);
completed = true;
}
}
{code}
For example: If a batch starts with 100 records, {{batchSize}} and {{counter}} 
will both be initialized to 100. If all 100 records get filtered out, 
{{counter}} will be decremented 100 times, and {{{}finishedAllWrites(){}}}will 
record the value 100 to the underlying {{source-record-write-*}}  metrics 
rather than 0, the correct value according to the documentation for these 
metrics.
h3. Solutions

Assuming the documentation correctly captures the intent of the 
{{source-record-write-*}}  metrics, it seems reasonable to fix these metrics 
such that filtered records do not get counted.

It may also be useful to add additional metrics to capture the rate and total 
number of records filtered out by transformations, which would require a KIP.

I'm not sure what the best way of accounting for produce failures in the case 
of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own new 
metrics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-01-27 Thread via GitHub


cmccabe commented on PR #13114:
URL: https://github.com/apache/kafka/pull/13114#issuecomment-1407228326

   * We need a `ScramControlManagerTest` as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-01-27 Thread via GitHub


cmccabe commented on PR #13114:
URL: https://github.com/apache/kafka/pull/13114#issuecomment-1407228067

   We should not be changing `BrokerMetadataSnapshotterTest.scala` here -- 
doesn't seem needed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-01-27 Thread via GitHub


cmccabe commented on PR #13114:
URL: https://github.com/apache/kafka/pull/13114#issuecomment-1407227921

   Can we have a test in `ControllerApisTest`? These are mainly to verify that 
our authorization logic is sound.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-01-27 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1089556608


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * Negative {@code value_size} is used to indicate that the value stored is a 
tombstone, in order to
+ * distinguish from empty array which has {@code value_size} of zero. In 
practice, {@code value_size}
+ * is always set to -1 for the tombstone case, though this need not be true in 
general.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+
+/**
+ * @return the validTo timestamp of the latest record in the provided 
segment
+ */
+static long getNextTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(0);
+}
+
+/**
+ * Returns whether the provided segment is "empty." An empty segment is 
one that
+ * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+ * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+ * {@code nextTimestamp}) and nothing else.
+ * 
+ * This can happen if, e.g., the only record inserted for a particular key 
is
+ * a tombstone. In this case, the tombstone must be stored in a segment
+ * (as the latest value store does not store tombstones), but also has no 
validTo
+ * timestamp associated with it.

Review Comment:
   Hey @mjsax I just pushed an update to the serialization of "empty" segments 
(per your suggestion). The new serialization is `minTimestamp = nextTimestamp` 
with a single record version stored in the record versions list (tombstone 
value and timestamp equal to both `minTimestamp` and `nextTimestamp`). So, even 
though it's still a special case which cannot be combined with the regular case 
(as evidenced by the fact that the `insertAsLatest()` and `insertAsEarliest()` 
methods still have if-conditions to handle this case separately), now this 
special-casing is fully encapsulated within this value formatter utility class, 
i.e., the versioned store implementation itself (for which I'll open a PR as 
soon as this one is merged) does not need to concern itself with this special 
case (and therefore the public-facing `isEmpty()` methods have been removed 
accordingly). The updated javadocs in the latest commit contain more details.
   
   I've also renamed "empty" segment to "degenerate" segment, to clarify that 
the segment is not truly empty, it's just degenerate because the single 
tombstone in it has `validFrom = validTo`, which is not typically allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13153: MINOR: startup timeouts for KRaft integration tests

2023-01-27 Thread via GitHub


cmccabe commented on code in PR #13153:
URL: https://github.com/apache/kafka/pull/13153#discussion_r1089524563


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -86,4 +89,30 @@ default Timer timer(Duration timeout) {
 return timer(timeout.toMillis());
 }
 
+/**
+ * Wait for a future to complete, or time out.
+ *
+ * @param futureThe future to wait for.
+ * @param deadlineNsThe time in the future, in monotonic nanoseconds, 
to time out.
+ * @return  The result of the future.
+ * @paramThe type of the future.
+ */
+default  T waitForFuture(
+CompletableFuture future,
+long deadlineNs

Review Comment:
   Can't use that here since `:clients` doesn't depend on `:server-common`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13153: MINOR: startup timeouts for KRaft integration tests

2023-01-27 Thread via GitHub


cmccabe commented on code in PR #13153:
URL: https://github.com/apache/kafka/pull/13153#discussion_r1089524442


##
server-common/src/main/java/org/apache/kafka/server/util/Deadline.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import org.apache.kafka.common.utils.Time;
+
+import java.math.BigInteger;
+import java.util.Date;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+
+public class Deadline {
+private final long nanoseconds;
+
+public static Deadline fromMonotonicNanoseconds(
+long nanoseconds
+) {
+return new Deadline(nanoseconds);
+}
+
+public static Deadline fromDelay(
+Time time,
+long delay,
+TimeUnit timeUnit
+) {
+return fromDelay(time.nanoseconds(), delay, timeUnit);
+}
+
+public static Deadline fromDelay(

Review Comment:
   I'll remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13153: MINOR: startup timeouts for KRaft integration tests

2023-01-27 Thread via GitHub


hachikuji commented on code in PR #13153:
URL: https://github.com/apache/kafka/pull/13153#discussion_r1089358150


##
server-common/src/main/java/org/apache/kafka/server/util/Deadline.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import org.apache.kafka.common.utils.Time;
+
+import java.math.BigInteger;
+import java.util.Date;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+
+public class Deadline {
+private final long nanoseconds;
+
+public static Deadline fromMonotonicNanoseconds(
+long nanoseconds
+) {
+return new Deadline(nanoseconds);
+}
+
+public static Deadline fromDelay(
+Time time,
+long delay,
+TimeUnit timeUnit
+) {
+return fromDelay(time.nanoseconds(), delay, timeUnit);
+}
+
+public static Deadline fromDelay(

Review Comment:
   Can this be private? Or if it needs to be accessible, maybe we can make it 
`fromDelayNs`



##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -86,4 +89,30 @@ default Timer timer(Duration timeout) {
 return timer(timeout.toMillis());
 }
 
+/**
+ * Wait for a future to complete, or time out.
+ *
+ * @param futureThe future to wait for.
+ * @param deadlineNsThe time in the future, in monotonic nanoseconds, 
to time out.
+ * @return  The result of the future.
+ * @paramThe type of the future.
+ */
+default  T waitForFuture(
+CompletableFuture future,
+long deadlineNs

Review Comment:
   Does it make sense to use `Deadline` here?



##
core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala:
##
@@ -295,6 +295,8 @@ abstract class QuorumTestHarness extends Logging {
   throw new RuntimeException("Only one KRaft controller is supported for 
now.")
 }
 val props = propsList(0)
+props.setProperty(KafkaConfig.BrokerServerMaxStartupTimeMs, 
TimeUnit.MINUTES.toMillis(10).toString)

Review Comment:
   Seems like we need to update these references?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13153: MINOR: startup timeouts for KRaft integration tests

2023-01-27 Thread via GitHub


cmccabe commented on code in PR #13153:
URL: https://github.com/apache/kafka/pull/13153#discussion_r1089352604


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1153,6 +1157,8 @@ object KafkaConfig {
   .define(MetadataMaxRetentionBytesProp, LONG, 
Defaults.MetadataMaxRetentionBytes, null, HIGH, MetadataMaxRetentionBytesDoc)
   .define(MetadataMaxRetentionMillisProp, LONG, 
LogConfig.DEFAULT_RETENTION_MS, null, HIGH, MetadataMaxRetentionMillisDoc)
   .define(MetadataMaxIdleIntervalMsProp, INT, 
Defaults.MetadataMaxIdleIntervalMs, atLeast(0), LOW, 
MetadataMaxIdleIntervalMsDoc)
+  .defineInternal(BrokerServerMaxStartupTimeMs, LONG, 
Defaults.BrokerServerMaxStartupTimeMs, atLeast(0), MEDIUM, 
BrokerServerMaxStartupTimeMsDoc)

Review Comment:
   yeah



##
server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java:
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.math.BigInteger;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+
+public class FutureUtils {
+/**
+ * Based on the current time and a delay, computes a monotonic deadline in 
the future.
+ *
+ * @param nowNs The current time in monotonic nanoseconds.
+ * @param delayMs   The delay in milliseconds.
+ * @return  The monotonic deadline in the future. This value is 
capped at
+ *  Long.MAX_VALUE.
+ */
+public static long getDeadlineNsFromDelayMs(
+long nowNs,
+long delayMs
+) {
+if (delayMs < 0) {
+throw new RuntimeException("Negative delays are not allowed.");
+}
+BigInteger delayNs = 
BigInteger.valueOf(delayMs).multiply(BigInteger.valueOf(1_000_000));
+BigInteger deadlineNs = BigInteger.valueOf(nowNs).add(delayNs);
+if (deadlineNs.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) >= 0) {
+return Long.MAX_VALUE;
+} else {
+return deadlineNs.longValue();
+}
+}
+
+/**
+ * Wait for a future until a specific time in the future, with copious 
logging.
+ *
+ * @param log   The slf4j object to use to log success and failure.
+ * @param actionThe action we are waiting for.
+ * @param futureThe future we are waiting for.
+ * @param deadlineNsThe deadline in the future we are waiting for.
+ * @param time  The clock object.
+ * @return  The result of the future.
+ * @paramThe type of the future.
+ *
+ * @throws java.util.concurrent.TimeoutException If the future times out.
+ * @throws Throwable If the future fails. Note: we unwrap 
ExecutionException here.
+ */
+public static  T waitWithLogging(
+Logger log,
+String action,
+CompletableFuture future,
+long deadlineNs,

Review Comment:
   I will create a Deadline class to help with this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13153: MINOR: startup timeouts for KRaft integration tests

2023-01-27 Thread via GitHub


cmccabe commented on code in PR #13153:
URL: https://github.com/apache/kafka/pull/13153#discussion_r1089313497


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -86,4 +89,30 @@ default Timer timer(Duration timeout) {
 return timer(timeout.toMillis());
 }
 
+/**
+ * Wait for a future to complete, or time out.
+ *
+ * @param futureThe future to wait for.
+ * @param deadlineNsThe time in the future, in monotonic nanoseconds, 
to time out.
+ * @return  The result of the future.
+ * @paramThe type of the future.
+ */
+default  T waitForFuture(
+CompletableFuture future,
+long deadlineNs
+) throws TimeoutException, InterruptedException, ExecutionException  {
+TimeoutException timeoutException = null;
+while (true) {

Review Comment:
   It's a bit unclear. It's traditional for condition variables to allow 
spurious wakeups, and probably this is implemented with a condition variable. 
The JavaDoc for `get` just says that it will wait "at most the given time" 
which doesn't really clarify matters. In any case, the loop certainly does no 
harm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13153: MINOR: startup timeouts for KRaft integration tests

2023-01-27 Thread via GitHub


hachikuji commented on code in PR #13153:
URL: https://github.com/apache/kafka/pull/13153#discussion_r1089298321


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1153,6 +1157,8 @@ object KafkaConfig {
   .define(MetadataMaxRetentionBytesProp, LONG, 
Defaults.MetadataMaxRetentionBytes, null, HIGH, MetadataMaxRetentionBytesDoc)
   .define(MetadataMaxRetentionMillisProp, LONG, 
LogConfig.DEFAULT_RETENTION_MS, null, HIGH, MetadataMaxRetentionMillisDoc)
   .define(MetadataMaxIdleIntervalMsProp, INT, 
Defaults.MetadataMaxIdleIntervalMs, atLeast(0), LOW, 
MetadataMaxIdleIntervalMsDoc)
+  .defineInternal(BrokerServerMaxStartupTimeMs, LONG, 
Defaults.BrokerServerMaxStartupTimeMs, atLeast(0), MEDIUM, 
BrokerServerMaxStartupTimeMsDoc)

Review Comment:
   Seems like we could get away with one internal config. Maybe 
`process.max.startup.time.ms`?



##
server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java:
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.math.BigInteger;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+
+public class FutureUtils {
+/**
+ * Based on the current time and a delay, computes a monotonic deadline in 
the future.
+ *
+ * @param nowNs The current time in monotonic nanoseconds.
+ * @param delayMs   The delay in milliseconds.
+ * @return  The monotonic deadline in the future. This value is 
capped at
+ *  Long.MAX_VALUE.
+ */
+public static long getDeadlineNsFromDelayMs(
+long nowNs,
+long delayMs
+) {
+if (delayMs < 0) {
+throw new RuntimeException("Negative delays are not allowed.");
+}
+BigInteger delayNs = 
BigInteger.valueOf(delayMs).multiply(BigInteger.valueOf(1_000_000));
+BigInteger deadlineNs = BigInteger.valueOf(nowNs).add(delayNs);
+if (deadlineNs.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) >= 0) {
+return Long.MAX_VALUE;
+} else {
+return deadlineNs.longValue();
+}
+}
+
+/**
+ * Wait for a future until a specific time in the future, with copious 
logging.
+ *
+ * @param log   The slf4j object to use to log success and failure.
+ * @param actionThe action we are waiting for.
+ * @param futureThe future we are waiting for.
+ * @param deadlineNsThe deadline in the future we are waiting for.
+ * @param time  The clock object.
+ * @return  The result of the future.
+ * @paramThe type of the future.
+ *
+ * @throws java.util.concurrent.TimeoutException If the future times out.
+ * @throws Throwable If the future fails. Note: we unwrap 
ExecutionException here.
+ */
+public static  T waitWithLogging(
+Logger log,
+String action,
+CompletableFuture future,
+long deadlineNs,

Review Comment:
   Wonder if we could use a `Duration` object or something like that to avoid 
incorrect unit usage.



##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -86,4 +89,30 @@ default Timer timer(Duration timeout) {
 return timer(timeout.toMillis());
 }
 
+/**
+ * Wait for a future to complete, or time out.
+ *
+ * @param futureThe future to wait for.
+ * @param deadlineNsThe time in the future, in monotonic nanoseconds, 
to time out.
+ * @return  The result of the future.
+ * @paramThe type of the future.
+ */
+default  T waitForFuture(
+CompletableFuture future,
+long deadlineNs
+) throws TimeoutException, InterruptedException, ExecutionException  {
+TimeoutException timeoutException = null;
+while (true) {

Review Comment:
   Why is the loop needed? Does `future.get` not respect the timeout?



-- 
This is an automated message from the Apache Git 

[jira] [Updated] (KAFKA-14658) When listening on fixed ports, defer port opening until we're ready

2023-01-27 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-14658:
-
Description: 
When we are listening on fixed ports, we should defer opening ports until we're 
ready to accept traffic. If we open the broker port too early, it can confuse 
monitoring and deployment systems. This is a particular concern when in KRaft 
mode, since in that mode, we create the SocketServer object earlier in the 
startup process than when in ZK mode.

The approach taken in this PR is to defer opening the acceptor port until 
Acceptor.start is called. Note that when we are listening on a random port, we 
continue to open the port "early," in the SocketServer constructor. The reason 
for doing this is that there is no other way to find the random port number the 
kernel has selected. Since random port assignment is not used in production 
deployments, this should be reasonable.

  was:
We should not open the ports on the broker until we are ready to accept 
traffic. This is a particular concern when in KRaft mode, since in that mode, 
we create the SocketServer object earlier in the startup process than when in 
ZK mode.

The approach taken in this PR is to defer opening the acceptor port until 
Acceptor.start is called. There is one exception to this rule, however: when we 
are binding to a random port (that is, binding to "port 0"), we open the port 


> When listening on fixed ports, defer port opening until we're ready
> ---
>
> Key: KAFKA-14658
> URL: https://issues.apache.org/jira/browse/KAFKA-14658
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>
> When we are listening on fixed ports, we should defer opening ports until 
> we're ready to accept traffic. If we open the broker port too early, it can 
> confuse monitoring and deployment systems. This is a particular concern when 
> in KRaft mode, since in that mode, we create the SocketServer object earlier 
> in the startup process than when in ZK mode.
> The approach taken in this PR is to defer opening the acceptor port until 
> Acceptor.start is called. Note that when we are listening on a random port, 
> we continue to open the port "early," in the SocketServer constructor. The 
> reason for doing this is that there is no other way to find the random port 
> number the kernel has selected. Since random port assignment is not used in 
> production deployments, this should be reasonable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14658) Do not open broker ports until we are ready to accept traffic

2023-01-27 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-14658:
-
Description: 
We should not open the ports on the broker until we are ready to accept 
traffic. This is a particular concern when in KRaft mode, since in that mode, 
we create the SocketServer object earlier in the startup process than when in 
ZK mode.

The approach taken in this PR is to defer opening the acceptor port until 
Acceptor.start is called. There is one exception to this rule, however: when we 
are binding to a random port (that is, binding to "port 0"), we open the port 

  was:We should not open the ports on the broker until we are ready to accept 
traffic. This is a particular concern when in KRaft mode, since in that mode, 
we create the SocketServer object earlier in the startup process than when in 
ZK mode.


> Do not open broker ports until we are ready to accept traffic
> -
>
> Key: KAFKA-14658
> URL: https://issues.apache.org/jira/browse/KAFKA-14658
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>
> We should not open the ports on the broker until we are ready to accept 
> traffic. This is a particular concern when in KRaft mode, since in that mode, 
> we create the SocketServer object earlier in the startup process than when in 
> ZK mode.
> The approach taken in this PR is to defer opening the acceptor port until 
> Acceptor.start is called. There is one exception to this rule, however: when 
> we are binding to a random port (that is, binding to "port 0"), we open the 
> port 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14658) When listening on fixed ports, defer port opening until we're ready

2023-01-27 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-14658:
-
Summary: When listening on fixed ports, defer port opening until we're 
ready  (was: Do not open broker ports until we are ready to accept traffic)

> When listening on fixed ports, defer port opening until we're ready
> ---
>
> Key: KAFKA-14658
> URL: https://issues.apache.org/jira/browse/KAFKA-14658
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>
> We should not open the ports on the broker until we are ready to accept 
> traffic. This is a particular concern when in KRaft mode, since in that mode, 
> we create the SocketServer object earlier in the startup process than when in 
> ZK mode.
> The approach taken in this PR is to defer opening the acceptor port until 
> Acceptor.start is called. There is one exception to this rule, however: when 
> we are binding to a random port (that is, binding to "port 0"), we open the 
> port 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14658) Do not open broker ports until we are ready to accept traffic

2023-01-27 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-14658:


 Summary: Do not open broker ports until we are ready to accept 
traffic
 Key: KAFKA-14658
 URL: https://issues.apache.org/jira/browse/KAFKA-14658
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe
Assignee: Colin McCabe


We should not open the ports on the broker until we are ready to accept 
traffic. This is a particular concern when in KRaft mode, since in that mode, 
we create the SocketServer object earlier in the startup process than when in 
ZK mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: Port opening fix

2023-01-27 Thread via GitHub


cmccabe commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1089236363


##
server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+
+
+public class FutureUtils {
+/**
+ * Based on the current time and a delay, computes a monotonic deadline in 
the future.
+ *
+ * @param nowNs The current time in monotonic nanoseconds.
+ * @param delayMs   The delay in milliseconds.
+ * @return  The monotonic deadline in the future. This value is 
capped at
+ *  Long.MAX_VALUE.
+ */
+public static long getDeadlineNsFromDelayMs(
+long nowNs,
+long delayMs
+) {
+if (delayMs < 0) {
+throw new RuntimeException("Negative delays are not allowed.");
+}
+BigInteger delayNs = 
BigInteger.valueOf(delayMs).multiply(BigInteger.valueOf(1_000_000));

Review Comment:
   two reasons
   
   `System.nanotime` may return negative values. Subtracting a negative value 
from the largest possible 64-bit integer will result in overflow if done with 
64-bit math.
   
   Additionally, of course, the configuraiton key is in milliseconds, whereas 
we care about nanoseconds, so we would immediately overflow without using 
`BigInteger`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: Port opening fix

2023-01-27 Thread via GitHub


cmccabe commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1089233294


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -86,4 +89,30 @@ default Timer timer(Duration timeout) {
 return timer(timeout.toMillis());
 }
 
+/**
+ * Wait for a future to complete, or time out.
+ *
+ * @param futureThe future to wait for.
+ * @param deadlineNsThe time in the future, in monotonic nanoseconds, 
to time out.
+ * @return  The result of the future.
+ * @paramThe type of the future.
+ */
+default  T waitForFuture(
+CompletableFuture future,
+long deadlineNs
+) throws TimeoutException, InterruptedException, ExecutionException  {
+TimeoutException timeoutException = null;
+while (true) {
+long nowNs = nanoseconds();
+if (deadlineNs <= nowNs) {
+throw (timeoutException == null) ? new TimeoutException() : 
timeoutException;
+}
+long deltaNs = deadlineNs - nowNs;
+try {
+return future.get(deltaNs, TimeUnit.NANOSECONDS);

Review Comment:
   it will not work if we are already past the deadline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14657) Admin.fenceProducers fails when Producer has ongoing transaction - but Producer gets fenced

2023-01-27 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681416#comment-17681416
 ] 

Justine Olshan commented on KAFKA-14657:


Is the issue here that it appears the command failed? The fencing actually 
taking place is fine, its just unclear it worked because of the error?

> Admin.fenceProducers fails when Producer has ongoing transaction - but 
> Producer gets fenced
> ---
>
> Key: KAFKA-14657
> URL: https://issues.apache.org/jira/browse/KAFKA-14657
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: FenceProducerDuringTx.java, FenceProducerOutsideTx.java
>
>
> Admin.fenceProducers() 
> fails with a ConcurrentTransactionsException if invoked when a Producer has a 
> transaction ongoing.
> However, further attempts by that producer to produce fail with 
> InvalidProducerEpochException and the producer is not re-usable, 
> cannot abort/commit as it is fenced.
> An InvalidProducerEpochException is also logged as error on the broker
> [2023-01-27 17:16:32,220] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition topic-0 (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.InvalidProducerEpochException: Epoch of 
> producer 1062 at offset 84 in topic-0 is 0, which is smaller than the last 
> seen epoch
>  
> Conversely, if Admin.fenceProducers() 
> is invoked while there is no open transaction, the call succeeds and further 
> attempts by that producer to produce fail with ProducerFenced.
> see attached snippets
> As the caller of Admin.fenceProducers() is likely unaware of the producers 
> state, the call should succeed regardless



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison opened a new pull request, #13170: MINOR: Remove unused methods in CoreUtils

2023-01-27 Thread via GitHub


mimaison opened a new pull request, #13170:
URL: https://github.com/apache/kafka/pull/13170

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14657) Admin.fenceProducers fails when Producer has ongoing transaction - but Producer gets fenced

2023-01-27 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-14657:
--
Description: 
Admin.fenceProducers() 
fails with a ConcurrentTransactionsException if invoked when a Producer has a 
transaction ongoing.
However, further attempts by that producer to produce fail with 
InvalidProducerEpochException and the producer is not re-usable, 
cannot abort/commit as it is fenced.

An InvalidProducerEpochException is also logged as error on the broker

[2023-01-27 17:16:32,220] ERROR [ReplicaManager broker=1] Error processing 
append operation on partition topic-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.InvalidProducerEpochException: Epoch of producer 
1062 at offset 84 in topic-0 is 0, which is smaller than the last seen epoch

 

Conversely, if Admin.fenceProducers() 
is invoked while there is no open transaction, the call succeeds and further 
attempts by that producer to produce fail with ProducerFenced.

see attached snippets

As the caller of Admin.fenceProducers() is likely unaware of the producers 
state, the call should succeed regardless

  was:
Admin.fenceProducers() 
fails with a ConcurrentTransactionsException if invoked when a Producer has a 
transaction ongoing.
However, further attempts by that producer to produce fail with 
InvalidProducerEpochException and the producer is not re-usable, 
cannot abort/commit as it is fenced.

Conversely, if Admin.fenceProducers() 
is invoked while there is no open transaction, the call succeeds and further 
attempts by that producer to produce fail with ProducerFenced.

see attached snippets

As the caller of Admin.fenceProducers() is likely unaware of the producers 
state, the call should succeed regardless


> Admin.fenceProducers fails when Producer has ongoing transaction - but 
> Producer gets fenced
> ---
>
> Key: KAFKA-14657
> URL: https://issues.apache.org/jira/browse/KAFKA-14657
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: FenceProducerDuringTx.java, FenceProducerOutsideTx.java
>
>
> Admin.fenceProducers() 
> fails with a ConcurrentTransactionsException if invoked when a Producer has a 
> transaction ongoing.
> However, further attempts by that producer to produce fail with 
> InvalidProducerEpochException and the producer is not re-usable, 
> cannot abort/commit as it is fenced.
> An InvalidProducerEpochException is also logged as error on the broker
> [2023-01-27 17:16:32,220] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition topic-0 (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.InvalidProducerEpochException: Epoch of 
> producer 1062 at offset 84 in topic-0 is 0, which is smaller than the last 
> seen epoch
>  
> Conversely, if Admin.fenceProducers() 
> is invoked while there is no open transaction, the call succeeds and further 
> attempts by that producer to produce fail with ProducerFenced.
> see attached snippets
> As the caller of Admin.fenceProducers() is likely unaware of the producers 
> state, the call should succeed regardless



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14657) Admin.fenceProducers fails when Producer has ongoing transaction - but Producer gets fenced

2023-01-27 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-14657:
--
Description: 
Admin.fenceProducers() 
fails with a ConcurrentTransactionsException if invoked when a Producer has a 
transaction ongoing.
However, further attempts by that producer to produce fail with 
InvalidProducerEpochException and the producer is not re-usable, 
cannot abort/commit as it is fenced.

Conversely, if Admin.fenceProducers() 
is invoked while there is no open transaction, the call succeeds and further 
attempts by that producer to produce fail with ProducerFenced.

see attached snippets

As the caller of Admin.fenceProducers() is likely unaware of the producers 
state, the call should succeed regardless

  was:
{{Admin.fenceProducers() }}
fails with a ConcurrentTransactionsException if invoked when a Producer has a 
transaction ongoing.
However, further attempts by that producer to produce fail with 
InvalidProducerEpochException and the producer is not re-usable, 
cannot abort/commit as it is fenced.

Conversely, if 
{{Admin.fenceProducers() }}
is invoked while there is no open transaction, the call succeeds and further 
attempts by that producer to produce fail with ProducerFenced.

see attached snippets 

As the caller of {{Admin.fenceProducers() }} the call should succeed regardless 
of the state of the producer


> Admin.fenceProducers fails when Producer has ongoing transaction - but 
> Producer gets fenced
> ---
>
> Key: KAFKA-14657
> URL: https://issues.apache.org/jira/browse/KAFKA-14657
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: FenceProducerDuringTx.java, FenceProducerOutsideTx.java
>
>
> Admin.fenceProducers() 
> fails with a ConcurrentTransactionsException if invoked when a Producer has a 
> transaction ongoing.
> However, further attempts by that producer to produce fail with 
> InvalidProducerEpochException and the producer is not re-usable, 
> cannot abort/commit as it is fenced.
> Conversely, if Admin.fenceProducers() 
> is invoked while there is no open transaction, the call succeeds and further 
> attempts by that producer to produce fail with ProducerFenced.
> see attached snippets
> As the caller of Admin.fenceProducers() is likely unaware of the producers 
> state, the call should succeed regardless



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14657) Admin.fenceProducers fails when Producer has ongoing transaction - but Producer gets fenced

2023-01-27 Thread Edoardo Comar (Jira)
Edoardo Comar created KAFKA-14657:
-

 Summary: Admin.fenceProducers fails when Producer has ongoing 
transaction - but Producer gets fenced
 Key: KAFKA-14657
 URL: https://issues.apache.org/jira/browse/KAFKA-14657
 Project: Kafka
  Issue Type: Bug
  Components: admin
Reporter: Edoardo Comar
 Attachments: FenceProducerDuringTx.java, FenceProducerOutsideTx.java

{{Admin.fenceProducers() }}
fails with a ConcurrentTransactionsException if invoked when a Producer has a 
transaction ongoing.
However, further attempts by that producer to produce fail with 
InvalidProducerEpochException and the producer is not re-usable, 
cannot abort/commit as it is fenced.

Conversely, if 
{{Admin.fenceProducers() }}
is invoked while there is no open transaction, the call succeeds and further 
attempts by that producer to produce fail with ProducerFenced.

see attached snippets 

As the caller of {{Admin.fenceProducers() }} the call should succeed regardless 
of the state of the producer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-5613) Deprecate JmxTool?

2023-01-27 Thread Federico Valeri (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681405#comment-17681405
 ] 

Federico Valeri commented on KAFKA-5613:


FYI, there is an open PR to migrate/convert the JmxTool as part of KAFKA-14525.

[https://github.com/apache/kafka/pull/13136]

> Deprecate JmxTool?
> --
>
> Key: KAFKA-5613
> URL: https://issues.apache.org/jira/browse/KAFKA-5613
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>
> According to git-blame, JmxTool has been around since October 2011. We use it 
> in system tests, but we are thinking it might be best to replace it: 
> https://issues.apache.org/jira/browse/KAFKA-5612
> When making modifications for system tests, we've had to take into account 
> compatibility because this tool is technically included in our distribution 
> and, perhaps unintentionally, a public utility.
> We know that "real" tools for JMX, like jmxtrans, are more commonly used, but 
> we don't know who might be using JmxTool simply because it ships with Kafka. 
> That said, it also isn't documented in the Kafka documentation, so you 
> probably have to dig around to find it.
> Hopefully we can deprecate this and eventually move it either to a jar that 
> is only used for system tests, or even better just remove it entirely. To do 
> any of this, we'd probably need to do at least a cursory survey of the 
> community to get a feel for usage level.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14615) Use sizeCompare(Iterable[_]) to compare two iterables

2023-01-27 Thread Federico Valeri (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681404#comment-17681404
 ] 

Federico Valeri edited comment on KAFKA-14615 at 1/27/23 4:49 PM:
--

FYI, there is an open PR to migrate/convert the JmxTool as part of KAFKA-14525.

[https://github.com/apache/kafka/pull/13136]


was (Author: fvaleri):
FYI, there is an open PR to migrate/convert the JmxTool: 
https://github.com/apache/kafka/pull/13136

> Use sizeCompare(Iterable[_]) to compare two iterables
> -
>
> Key: KAFKA-14615
> URL: https://issues.apache.org/jira/browse/KAFKA-14615
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Assignee: Kamalesh Palanisamy
>Priority: Minor
>  Labels: Newbie, newbie
> Fix For: 4.0.0
>
>
> Since Scala 2.12 is being deprecated in 4.x version, we can utilize some 
> improved methods for comparing size of scala collections which were 
> introduced starting 2.13.
> This task is to find and replace usage of code paths where we use 
> (IterableA.size == IterableB.size) having a complexity of O(IterableA size + 
> IterableB size) with sizeCompare() method which has a complexity of 
> O(min(IterableA size, IterableB size))
> Some examples where sizeCompare() could be used are:
> 1. 
> [https://github.com/apache/kafka/blob/78d4458b94e585bc602a4ae307d3de54fcedf2af/core/src/main/scala/kafka/server/KafkaApis.scala#L1177]
> 2. 
> [https://github.com/apache/kafka/blob/78d4458b94e585bc602a4ae307d3de54fcedf2af/core/src/main/scala/kafka/tools/JmxTool.scala#L215]
>  
> 3. 
>  [1] [https://github.com/scala/scala/pull/6758]
>  [2] [https://github.com/scala/scala/pull/6950] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14615) Use sizeCompare(Iterable[_]) to compare two iterables

2023-01-27 Thread Federico Valeri (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681404#comment-17681404
 ] 

Federico Valeri commented on KAFKA-14615:
-

FYI, there is an open PR to migrate/convert the JmxTool: 
https://github.com/apache/kafka/pull/13136

> Use sizeCompare(Iterable[_]) to compare two iterables
> -
>
> Key: KAFKA-14615
> URL: https://issues.apache.org/jira/browse/KAFKA-14615
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Assignee: Kamalesh Palanisamy
>Priority: Minor
>  Labels: Newbie, newbie
> Fix For: 4.0.0
>
>
> Since Scala 2.12 is being deprecated in 4.x version, we can utilize some 
> improved methods for comparing size of scala collections which were 
> introduced starting 2.13.
> This task is to find and replace usage of code paths where we use 
> (IterableA.size == IterableB.size) having a complexity of O(IterableA size + 
> IterableB size) with sizeCompare() method which has a complexity of 
> O(min(IterableA size, IterableB size))
> Some examples where sizeCompare() could be used are:
> 1. 
> [https://github.com/apache/kafka/blob/78d4458b94e585bc602a4ae307d3de54fcedf2af/core/src/main/scala/kafka/server/KafkaApis.scala#L1177]
> 2. 
> [https://github.com/apache/kafka/blob/78d4458b94e585bc602a4ae307d3de54fcedf2af/core/src/main/scala/kafka/tools/JmxTool.scala#L215]
>  
> 3. 
>  [1] [https://github.com/scala/scala/pull/6758]
>  [2] [https://github.com/scala/scala/pull/6950] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-7666) KIP-391: Allow Producing with Offsets for Cluster Replication

2023-01-27 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar resolved KAFKA-7666.
--
Resolution: Won't Fix

KIP has been retired

> KIP-391: Allow Producing with Offsets for Cluster Replication
> -
>
> Key: KAFKA-7666
> URL: https://issues.apache.org/jira/browse/KAFKA-7666
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Major
>
> Implementing KIP-391
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-391%3A+Allow+Producing+with+Offsets+for+Cluster+Replication



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

2023-01-27 Thread via GitHub


dpcollins-google commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1089108841


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel 
destChannel,
  * @param length The number of bytes to write
  * @throws IOException For any errors writing to the output
  */
-public static void writeTo(DataOutput out, ByteBuffer buffer, int length) 
throws IOException {
+public static void writeTo(DataOutputStream out, ByteBuffer buffer, int 
length) throws IOException {
 if (buffer.hasArray()) {
 out.write(buffer.array(), buffer.position() + 
buffer.arrayOffset(), length);
 } else {
-int pos = buffer.position();
-for (int i = pos; i < length + pos; i++)
-out.writeByte(buffer.get(i));
+Channels.newChannel(out).write(buffer);

Review Comment:
   1) Yes, per the docs of WritableByteChannel
   
   ```
   Writes a sequence of bytes to this channel from the given buffer.
   An attempt is made to write up to r bytes to the channel, where r is the 
number of bytes remaining in the buffer, that is, src.remaining(), at the 
moment this method is invoked.
   
   Suppose that a byte sequence of length n is written, where 0 <= n <= r. This 
byte sequence will be transferred from the buffer starting at index p, where p 
is the buffer's position at the moment this method is invoked; the index of the 
last byte written will be p + n - 1. Upon return the buffer's position will be 
equal to p + n; its limit will not have changed.
   ```
   
   2) Good point, although I don't think this has an effect on any of the 4 
current users (DefaultRecord.writeTo and LegacyRecord.writeTo for writing key 
and value), I've added a defensive call to asReadOnlyBuffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14584) Move StateChangeLogMerger to tools

2023-01-27 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri reassigned KAFKA-14584:
---

Assignee: Federico Valeri

> Move StateChangeLogMerger to tools
> --
>
> Key: KAFKA-14584
> URL: https://issues.apache.org/jira/browse/KAFKA-14584
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Federico Valeri
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Hangleton commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

2023-01-27 Thread via GitHub


Hangleton commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1088967227


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel 
destChannel,
  * @param length The number of bytes to write
  * @throws IOException For any errors writing to the output
  */
-public static void writeTo(DataOutput out, ByteBuffer buffer, int length) 
throws IOException {
+public static void writeTo(DataOutputStream out, ByteBuffer buffer, int 
length) throws IOException {
 if (buffer.hasArray()) {
 out.write(buffer.array(), buffer.position() + 
buffer.arrayOffset(), length);
 } else {
-int pos = buffer.position();
-for (int i = pos; i < length + pos; i++)
-out.writeByte(buffer.get(i));
+Channels.newChannel(out).write(buffer);

Review Comment:
   - Is this preserving the same behaviour i.e. copy in the range `[pos, pos + 
length]`? 
   - This is mutating the position of the `buffer`, as opposed to the current 
implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14591) Move DeleteRecordsCommand to tools

2023-01-27 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-14591:
--

Assignee: Mickael Maison

> Move DeleteRecordsCommand to tools
> --
>
> Key: KAFKA-14591
> URL: https://issues.apache.org/jira/browse/KAFKA-14591
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9975) KIP-611: Improved Handling of Abandoned Connectors and Tasks

2023-01-27 Thread Sayed Mohammad Hossein Torabi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681320#comment-17681320
 ] 

Sayed Mohammad Hossein Torabi commented on KAFKA-9975:
--

I would like to work on this KIP

> KIP-611: Improved Handling of Abandoned Connectors and Tasks
> 
>
> Key: KAFKA-9975
> URL: https://issues.apache.org/jira/browse/KAFKA-9975
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Major
>  Labels: needs-kip
>
> (To be fleshed out once 
> [KIP-611|https://cwiki.apache.org/confluence/display/KAFKA/KIP-611%3A+Improved+Handling+of+Abandoned+Connectors+and+Tasks]
>  is finalized)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Cerchie commented on a diff in pull request #13161: Kafka 14128

2023-01-27 Thread via GitHub


Cerchie commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1088951633


##
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java:
##
@@ -160,7 +160,7 @@ private void maybeThrowCancellationException(Throwable 
cause) {
  * Waits if necessary for this future to complete, and then returns its 
result.
  */
 @Override
-public T get() throws InterruptedException, ExecutionException {
+public abstract T get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {

Review Comment:
   That makes sense! I think this change might be in here by accident -- I 
thought I had done that. Thanks for pointing that out. I'll remove it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13169: Port opening fix

2023-01-27 Thread via GitHub


Hangleton commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1088918527


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -86,4 +89,30 @@ default Timer timer(Duration timeout) {
 return timer(timeout.toMillis());
 }
 
+/**
+ * Wait for a future to complete, or time out.
+ *
+ * @param futureThe future to wait for.
+ * @param deadlineNsThe time in the future, in monotonic nanoseconds, 
to time out.
+ * @return  The result of the future.
+ * @paramThe type of the future.
+ */
+default  T waitForFuture(
+CompletableFuture future,
+long deadlineNs
+) throws TimeoutException, InterruptedException, ExecutionException  {
+TimeoutException timeoutException = null;
+while (true) {
+long nowNs = nanoseconds();
+if (deadlineNs <= nowNs) {
+throw (timeoutException == null) ? new TimeoutException() : 
timeoutException;
+}
+long deltaNs = deadlineNs - nowNs;
+try {
+return future.get(deltaNs, TimeUnit.NANOSECONDS);

Review Comment:
   Naive question - what is incorrect with 
   
   ```
   default  T waitForFuture(
   CompletableFuture future,
   long deadlineNs
   ) throws TimeoutException, InterruptedException, ExecutionException  {
   
 return future.get(Math.max(0, deadlineNs - nanoseconds()))
   }
   ```



##
server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+
+
+public class FutureUtils {
+/**
+ * Based on the current time and a delay, computes a monotonic deadline in 
the future.
+ *
+ * @param nowNs The current time in monotonic nanoseconds.
+ * @param delayMs   The delay in milliseconds.
+ * @return  The monotonic deadline in the future. This value is 
capped at
+ *  Long.MAX_VALUE.
+ */
+public static long getDeadlineNsFromDelayMs(
+long nowNs,
+long delayMs
+) {
+if (delayMs < 0) {
+throw new RuntimeException("Negative delays are not allowed.");
+}
+BigInteger delayNs = 
BigInteger.valueOf(delayMs).multiply(BigInteger.valueOf(1_000_000));

Review Comment:
   Curious - which use cases result in overflow of the long values in this 
context?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13161: Kafka 14128

2023-01-27 Thread via GitHub


Hangleton commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1088898259


##
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java:
##
@@ -160,7 +160,7 @@ private void maybeThrowCancellationException(Throwable 
cause) {
  * Waits if necessary for this future to complete, and then returns its 
result.
  */
 @Override
-public T get() throws InterruptedException, ExecutionException {
+public abstract T get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {

Review Comment:
   Apologies if I misread, why not use the method `get` with timeout line 177?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #13136: KAFKA-14582: Move JmxTool to tools

2023-01-27 Thread via GitHub


fvaleri commented on PR #13136:
URL: https://github.com/apache/kafka/pull/13136#issuecomment-1406316345

   @mimaison I've rebased and added some tests. 
   
   This is now ready for another review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13060: KAFKA-14559: Fix JMX tool to handle the object names with wild cards and optional attributes

2023-01-27 Thread via GitHub


mimaison commented on PR #13060:
URL: https://github.com/apache/kafka/pull/13060#issuecomment-1406261073

   Thanks for the PR! We're moving JmxTool to the tools module in 
https://github.com/apache/kafka/pull/13136. Let's do the rewrite/move first, 
then I'll take a look at this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org