Re: [PR] KAFKA-14957: Update-Description-String [kafka]

2023-10-28 Thread via GitHub


github-actions[bot] commented on PR #13909:
URL: https://github.com/apache/kafka/pull/13909#issuecomment-1783988680

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-28 Thread via GitHub


ijuma commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375357830


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I think you misunderstood my point. I was saying that there are two types of 
callbacks: (1) execute on the same thread (2) execute on a different thread. 
They're not the same and the latter require more care.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-28 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375355538


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I think how much work is done in the callback is more a quantitative 
difference rather than a qualitative difference -- how much work needs to be 
done to treat it as a new pattern that requires some special care (and what is 
that care, can we qualify)?I agree that if the functionality in the 
callback just happened to be after the point of use of the RequestLocal then we 
wouldn't hit the problem (even if we captured it in the callback), but it seems 
to a matter of just random "luck".  I don't think "don't be unlucky" is a 
useful engineering guidance.  On the other hand, saying "don't pass thread 
local as an argument, access it at the point of use as a thread local instead" 
would avoid these cases.
   
   But maybe the first point to discuss is do we agree that the root cause is 
the fact that RequestLocal is bound to the executing thread context 
(effectively thread local passed as an argument) and it's not a typical pattern 
in Java or do we think that it's something else?  At some point I think there 
was a theory that the machinery for re-scheduling the callback on the request 
thread pool could be a culprit, but I think we all agree, that the problem 
would be the same even if the callback was executed directly on the network 
thread (or pretty much in any other way other than trying to schedule it on the 
exact same thread, which is doable, btw, but would hinder performance).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-28 Thread via GitHub


ijuma commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375332013


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   A lot of callback code runs on the same thread though - the cases where a 
different thread is used require more care and have caused a bunch of pain in 
the past.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-10-28 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1375331724


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-10-28 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1375330896


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class represents a telemetry metric that does not yet contain resource 
tags.
+ * These additional resource tags will be added before emitting metrics by the 
telemetry reporter.
+ */
+public class SinglePointMetric implements MetricKeyable {
+
+private final MetricKey key;
+private final Metric.Builder metricBuilder;
+
+private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
+this.key = key;
+this.metricBuilder = metricBuilder;
+}
+
+@Override
+public MetricKey key() {
+return key;
+}
+
+public Metric.Builder builder() {
+return metricBuilder;
+}
+
+public static SinglePointMetric create(MetricKey metricKey, Metric.Builder 
metric) {

Review Comment:
   Yeah though it make sense, but I would let @xvrl to decide as these classes 
will be used elsewhere as well and @xvrl was the original author of this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-10-28 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1375330776


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class represents a telemetry metric that does not yet contain resource 
tags.
+ * These additional resource tags will be added before emitting metrics by the 
telemetry reporter.
+ */
+public class SinglePointMetric implements MetricKeyable {
+
+private final MetricKey key;
+private final Metric.Builder metricBuilder;
+
+private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
+this.key = key;
+this.metricBuilder = metricBuilder;
+}
+
+@Override
+public MetricKey key() {
+return key;
+}
+
+public Metric.Builder builder() {
+return metricBuilder;
+}
+
+public static SinglePointMetric create(MetricKey metricKey, Metric.Builder 
metric) {
+return new SinglePointMetric(metricKey, metric);
+}
+
+/*
+Methods to construct gauge metric type.
+ */
+public static SinglePointMetric gauge(MetricKey metricKey, Number value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+public static SinglePointMetric gauge(MetricKey metricKey, double value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+/*
+Methods to construct sum metric type.
+ */
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp) {
+return sum(metricKey, value, monotonic, timestamp, null);
+}
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp,
+Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+if (startTimestamp != null) {
+point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+}
+
+return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point);
+}
+
+public static SinglePointMetric deltaSum(MetricKey metricKey, double 
value, boolean monotonic,
+Instant timestamp, Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value)
+.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+
+return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, monotonic, point);
+}
+
+/*
+Helper methods to support metric construction.
+ */
+private static SinglePointMetric sum(MetricKey metricKey, 
AggregationTemporality aggregationTemporality,
+boolean monotonic, NumberDataPoint.Builder point) {
+Objects.requireNonNull(point, "metric number data point cannot be 
null");
+Objects.requireNonNull(metricKey, "metric key cannot be null");
+
+point.addAllAttributes(asAttributes(metricKey.tags()));
+
+Metric.Builder metric = 
Metric.newBuilder().setName(metricKey.getName());
+metric
+.getSumBuilder()
+.setAggregationTemporality(aggregationTemporality)
+.setIsMonotonic(monotonic)
+.addDataPoints(point);
+return create(metricKey, metric);
+}
+
+private static SinglePointMetric gauge(MetricKey metricKey, 
NumberDataPoint.Builder point) {
+Objects.requireNonNull(point, "metric number data point cannot be 
null");

Review Comment:
   Can drop, not adding much value. Removed it.



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-28 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375329122


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   That's correct -- we don't create new concurrent access, i.e. this callback 
is not called on multiple threads at the same time, all we do here is 
potentially executing this callback on a different thread.  Actually I think 
"Accessing non-thread-safe data structures should be avoided if possible" 
comment is misleading here, the only thing that we have to avoid is capturing 
thread local (or effectively thread local) objects in the callback.  Other 
objects can be as thread safe as they would be if this method was called 
directly on this thread (i.e. if an object was rooted on the call stack it 
doesn't have to be thread safe because it's not going to be used concurrently 
from multiple threads).
   
   BTW, we have plenty of cases in Kafka (both client and server) where we pass 
a callback to continue execution of a single-threaded logical task after an RPC 
call is complete (most likely on a different thread), so I don't think we're 
dealing with unique pattern here.  The only thing that is unique (and tricky) 
here is the RequestLocal that disguises as a context of a logical task 
(arguments are generally are logical task context), but it is actually a 
concrete physical thread context that would be different (or missing 
completely, say if we executed this callback on the network thread) if we 
continued execution of this logical task on a different thread.
   
   I've already pointed out elsewhere that (in a future change) we should 
probably remove the RequestLocal from the argument list and make it a 
thread-local (which it effectively is) and just use it directly where it's 
needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve printing topic name when created topic in TopicCommand [kafka]

2023-10-28 Thread via GitHub


ijuma commented on code in PR #14661:
URL: https://github.com/apache/kafka/pull/14661#discussion_r1375311183


##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -469,7 +469,7 @@ public void createTopic(CommandTopicPartition topic) throws 
Exception {
 CreateTopicsResult createResult = 
adminClient.createTopics(Collections.singleton(newTopic),
 new CreateTopicsOptions().retryOnQuotaViolation(false));
 createResult.all().get();
-System.out.println("Created topic " + topic.name + ".");
+System.out.println("Created topic " + topic.name.get() + ".");

Review Comment:
   Thanks for the fix. I think we should pass the topic name to `createTopic` 
so we don't have to call `.get` in so many places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: replace an inefficient loop in kafka internals [kafka]

2023-10-28 Thread via GitHub


ijuma commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1375307580


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel 
destChannel,
  * @param length The number of bytes to write
  * @throws IOException For any errors writing to the output
  */
-public static void writeTo(DataOutput out, ByteBuffer buffer, int length) 
throws IOException {
+public static void writeTo(DataOutputStream out, ByteBuffer buffer, int 
length) throws IOException {
 if (buffer.hasArray()) {
 out.write(buffer.array(), buffer.position() + 
buffer.arrayOffset(), length);
 } else {
-int pos = buffer.position();
-for (int i = pos; i < length + pos; i++)
-out.writeByte(buffer.get(i));
+Channels.newChannel(out).write(buffer);

Review Comment:
   Sorry for the delay. I was trying to understand how to show the improvement, 
but we seem to always pass a heap byte buffer. How can I reproduce this 
improvement?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-10-28 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1375284308


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A LastValueTracker uses a ConcurrentMap to maintain historic values for a 
given key, and return

Review Comment:
   As @xvrl mentioned in the comment her that the operations has to be thread 
safe: https://github.com/apache/kafka/pull/14620#discussion_r1375028442



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-10-28 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1375284207


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A LastValueTracker uses a ConcurrentMap to maintain historic values for a 
given key, and return
+ * a previous value and an Instant for that value.
+ *
+ * @param  The type of the value.
+ */
+public class LastValueTracker {
+private final ConcurrentMap>> counters = new ConcurrentHashMap<>();
+
+/**
+ * Return the last instant/value for the given MetricKey, or 
Optional.empty if there isn't one.
+ *
+ * @param metricKey the key for which to calculate a getAndSet.
+ * @param now the timestamp for the new value.
+ * @param value the current value.
+ * @return the timestamp of the previous entry and its value. If there
+ * isn't a previous entry, then this method returns {@link 
Optional#empty()}
+ */
+public Optional> getAndSet(MetricKey metricKey, Instant 
now, T value) {
+InstantAndValue instantAndValue = new InstantAndValue<>(now, value);
+AtomicReference> valueOrNull = counters

Review Comment:
   To make the operations thread safe as MetricsCollector can invoke the method 
concurrently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-10-28 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1375283421


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a 

[PR] MINOR: Improve printing topic name when created topic in TopicCommand [kafka]

2023-10-28 Thread via GitHub


runom opened a new pull request, #14661:
URL: https://github.com/apache/kafka/pull/14661

   Print topic name as `String` value instead of `Optional`.
   
   before
   ```
   % bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
   Created topic Optional[test].
   ```
   
   after
   ```
   % bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
   Created topic test.
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Avoid a couple of map copies in `KRaftMetadataCache.getPartitionReplicaEndpoints` [kafka]

2023-10-28 Thread via GitHub


ijuma opened a new pull request, #14660:
URL: https://github.com/apache/kafka/pull/14660

   Neither the `toMap` or `filter` seem to be necessary.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2023-10-28 Thread via GitHub


ijuma commented on PR #14607:
URL: https://github.com/apache/kafka/pull/14607#issuecomment-1783833262

   I think this doesn't compile with Scala 2.12 - that needs to be fixed before 
we can proceed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-28 Thread via GitHub


ijuma commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375263740


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Actually, I don't think this method enforces that these maps are immutable. 
That said, my understanding is that we basically suspend processing and hence 
there is only one thread accessing these structures at a time. Is that correct? 
If so, the main thing we need to do is ensure we safely publish these data 
structures, they don't have to be immutable necessarily.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-28 Thread via GitHub


ijuma commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375262871


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   Can we file a JIRA for investigating the alternative? I agree with @junrao 
that it would be much simpler to reason about if we always schedule it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Change `CreateTopicPolicy.RequestMetadata` to always provide partitions, replication factor and replica assignments. [kafka]

2023-10-28 Thread via GitHub


ijuma commented on PR #14655:
URL: https://github.com/apache/kafka/pull/14655#issuecomment-1783829648

   No, this is not backwards compatible since it changes the semantics of the 
`replicaAssignments` method.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]

2023-10-28 Thread via GitHub


ijuma merged PR #14591:
URL: https://github.com/apache/kafka/pull/14591


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]

2023-10-28 Thread via GitHub


ijuma commented on PR #14591:
URL: https://github.com/apache/kafka/pull/14591#issuecomment-1783828937

   JDK 8 build passed, the rest had a number of unrelated flaky failures:
   
   > Build / JDK 21 and Scala 2.13 / 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testCreateTokenForOtherUserFails(String).quorum=kraft
   Build / JDK 21 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[2] 
Type=ZK, MetadataVersion=3.5-IV2, Security=PLAINTEXT
   Build / JDK 21 and Scala 2.13 / 
org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy
   Build / JDK 21 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAddNamedTopologyToRunningApplicationWithSingleInitialNamedTopology()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testNoOpRecordWriteAfterTimeout()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest.testSendOffsetsWithGroupId(String).quorum=zk
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2]
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.integration.EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown[at_least_once,
 processing threads = false]
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.integration.EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once,
 processing threads = false]
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[6] Type=Raft-Isolated, 
Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.7-IV1, 
Security=PLAINTEXT
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15687) Update host address for the GoupMetadata when replace static members

2023-10-28 Thread Yu Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780620#comment-17780620
 ] 

Yu Wang commented on KAFKA-15687:
-

[~sagarrao] with *group.instance.id* the member id will always start with the 
instance id(in our case it won't change after pod replaced). So the member id 
cannot help us to find the consumer instance after the pod replaced.

[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L419]

 

> Update host address for the GoupMetadata when replace static members
> 
>
> Key: KAFKA-15687
> URL: https://issues.apache.org/jira/browse/KAFKA-15687
> Project: Kafka
>  Issue Type: Improvement
>  Components: group-coordinator
>Affects Versions: 3.6.0
>Reporter: Yu Wang
>Priority: Major
>
> We are trying to use static membership protocol for our consumers in 
> Kubernetes. When our pod was recreated, we found that the host address in the 
> group description will not change to the address of the new created pod.
> For example we have one pod with *group.instance.id = id1 and ip = 
> 192.168.0.1* when the pod crashes, we will replace it with a new pod with 
> same *group.instance.id = id1* but a different {*}ip = 192.168.0.2{*}. After 
> the new pod joined in the consumer group, with the command "describe group", 
> we found the host is still {*}192.168.0.1{*}. This makes us cannot find 
> correct consumer instance when check the issue.
> After read the source code, we found that the groupCoordinator will not 
> change the host address for the same {*}groupInstanceId{*}.
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L316]
> Is it also possible to replace the host address when replace the static 
> member?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)