Re: [PR] [KAFKA-9965] Fix accumulator tryAppend, so that fresh new producerBatch is created [kafka]

2023-11-01 Thread via GitHub


blue-int commented on PR #12462:
URL: https://github.com/apache/kafka/pull/12462#issuecomment-1790149877

   I want RoundRobinPartitioner to work again. When might this PR be merged and 
released?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments

2023-11-01 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15682:
-
Description: 
One of the implementation of RemoteLogMetadataManager is 
TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
{{__remote_log_metadata}} to store the metadata about the remote log segments. 
Unlike other internal topics which are compaction enabled, this topic is not 
enabled with compaction and retention is set to unlimited. 

Keeping this internal topic retention to unlimited is not practical in real 
world use-case where the topic local disk usage footprint grow huge over a 
period of time. 

It is assumed that the user will set the retention to a reasonable time such 
that it is the max of all the user-created topics (max + X). We can't just rely 
on the assumption and need an assertion to ensure that the internal 
{{__remote_log_metadata}} segments are not eligible for deletion before the 
expiry of all the relevant user-topic uploaded remote-log-segments , otherwise 
there will be dangling remote-log-segments which won't be cleared once all the 
brokers are restarted post the internal topic retention cleanup.

See the discussion thread: 
https://github.com/apache/kafka/pull/14576#discussion_r1368576126

  was:
One of the implementation of RemoteLogMetadataManager is 
TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
{{__remote_log_metadata}} to store the metadata about the remote log segments. 
Unlike other internal topics which are compaction enabled, this topic is not 
enabled with compaction and retention is set to unlimited. 

Keeping this internal topic retention to unlimited is not practical in real 
world use-case where the topic local disk usage footprint grow huge over a 
period of time. 

It is assumed that the user will set the retention to a reasonable time such 
that it is the max of all the user-created topics (max + X). We can't just rely 
on the assumption and need an assertion to ensure that the internal 
{{__remote_log_metadata}} segments are not eligible for deletion before the 
expiry of all the relevant user-topic uploaded remote-log-segments , otherwise 
there will be dangling remote-log-segments which won't be cleared once all the 
brokers are restarted post the internal topic retention cleanup.


> Ensure internal remote log metadata topic does not expire its segments before 
> deleting user-topic segments
> --
>
> Key: KAFKA-15682
> URL: https://issues.apache.org/jira/browse/KAFKA-15682
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> One of the implementation of RemoteLogMetadataManager is 
> TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
> {{__remote_log_metadata}} to store the metadata about the remote log 
> segments. Unlike other internal topics which are compaction enabled, this 
> topic is not enabled with compaction and retention is set to unlimited. 
> Keeping this internal topic retention to unlimited is not practical in real 
> world use-case where the topic local disk usage footprint grow huge over a 
> period of time. 
> It is assumed that the user will set the retention to a reasonable time such 
> that it is the max of all the user-created topics (max + X). We can't just 
> rely on the assumption and need an assertion to ensure that the internal 
> {{__remote_log_metadata}} segments are not eligible for deletion before the 
> expiry of all the relevant user-topic uploaded remote-log-segments , 
> otherwise there will be dangling remote-log-segments which won't be cleared 
> once all the brokers are restarted post the internal topic retention cleanup.
> See the discussion thread: 
> https://github.com/apache/kafka/pull/14576#discussion_r1368576126



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Enable kraft test in kafka.api and kafka.network [kafka]

2023-11-01 Thread via GitHub


dengziming commented on PR #14595:
URL: https://github.com/apache/kafka/pull/14595#issuecomment-1790107126

   Hello @cmccabe @jsancio , no flaky is found in the latest 4 builds,  please 
take a look。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enable kraft test in kafka.api and kafka.network [kafka]

2023-11-01 Thread via GitHub


dengziming commented on PR #14595:
URL: https://github.com/apache/kafka/pull/14595#issuecomment-1790105902

   I triggered the CI 5 times, only `ConsumerBounceTest` is flaky and all other 
tests succeed, I have reverted `ConsumerBounceTest` and convert it in the 
future.
   
   
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//#showFailuresLink
   ```
   [Build / JDK 17 and Scala 2.13 / 
kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_17_and_Scala_2_13___testConsumptionWithBrokerFailures_String__quorum_kraft/)
   [Build / JDK 17 and Scala 2.13 / 
kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_17_and_Scala_2_13___testConsumptionWithBrokerFailures_String__quorum_kraft_2/)
   [Build / JDK 11 and Scala 2.13 / 
kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_11_and_Scala_2_13___testConsumptionWithBrokerFailures_String__quorum_kraft/)
   [Build / JDK 11 and Scala 2.13 / 
kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_11_and_Scala_2_13___testConsumptionWithBrokerFailures_String__quorum_kraft_2/)
   [Build / JDK 8 and Scala 2.12 / 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_8_and_Scala_2_12___testRackAwareRangeAssignor_String__quorum_kraft/)
   [Build / JDK 8 and Scala 2.12 / 
kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_8_and_Scala_2_12___testConsumptionWithBrokerFailures_String__quorum_zk/)
   [Build / JDK 8 and Scala 2.12 / 
kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_8_and_Scala_2_12___testConsumptionWithBrokerFailures_String__quorum_kraft/)
   [Build / JDK 21 and Scala 2.13 / 
kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_21_and_Scala_2_13___testConsumptionWithBrokerFailures_String__quorum_kraft/)
   [Build / JDK 21 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_21_and_Scala_2_13___testTrustStoreAlter_String__quorum_kraft/)
   ```
   
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//#showFailuresLink
   ```
   [Build / JDK 21 and Scala 2.13 / 
kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/2//testReport/junit/kafka.server/DescribeClusterRequestTest/Build___JDK_21_and_Scala_2_13___testDescribeClusterRequestIncludingClusterAuthorizedOperations_String__quorum_kraft/)
   [Build / JDK 21 and Scala 2.13 / 
kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/2//testReport/junit/kafka.server/DescribeClusterRequestTest/Build___JDK_21_and_Scala_2_13___testDescribeClusterRequestIncludingClusterAuthorizedOperations_String__quorum_kraft_2/)
   ```
   
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/3//#showFailuresLink
   ```
   [Build / JDK 11 and Scala 2.13 / 
kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/3//testReport/junit/kafka.api/TransactionsTest/Build___JDK_11_and_Scala_2_13___testBumpTransactionalEpoch_String__quorum_kraft/)
   ```
   
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/4//#showFailuresLink
   
   ```
   [Build / JDK 8 and Scala 2.12 / 
kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestExcludingClusterAuthorizedOperations(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/ka

[jira] [Commented] (KAFKA-15536) dynamically resize remoteIndexCache

2023-11-01 Thread Henry Cai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781999#comment-17781999
 ] 

Henry Cai commented on KAFKA-15536:
---

Can this be back ported to 3.6 branch?  

> dynamically resize remoteIndexCache
> ---
>
> Key: KAFKA-15536
> URL: https://issues.apache.org/jira/browse/KAFKA-15536
> Project: Kafka
>  Issue Type: Improvement
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Assignee: hudeqi
>Priority: Major
> Fix For: 3.7.0
>
>
> context:
> https://github.com/apache/kafka/pull/14243#discussion_r1320630057



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15632) Drop the invalid remote log metadata events

2023-11-01 Thread Henry Cai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781998#comment-17781998
 ] 

Henry Cai commented on KAFKA-15632:
---

Can this fix be back ported to 3.6 branch?  [~showuon] [~ckamal] 

> Drop the invalid remote log metadata events 
> 
>
> Key: KAFKA-15632
> URL: https://issues.apache.org/jira/browse/KAFKA-15632
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> {{__remote_log_metadata}} topic cleanup policy is set to {{DELETE}} and 
> default retention is set to unlimited.
> The expectation is that the user will configure the maximum retention time 
> for this internal topic compared to all the other user created topics in the 
> cluster. We cannot keep it to unlimited as the contents of this internal 
> topic need to be in the local storage.
> RemoteLogMetadata cache expect the events to be in the order of 
> [RemoteLogSegmentState#isValidTransition|https://github.com/apache/kafka/blob/trunk/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java#L93]
> Once the retention got expired for this topic say after 30 days due to breach 
> by size/time, then there can be partial metadata events and the 
> [cache|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java#L160]
>  start to throw RemoteResourceNotFoundError.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.

2023-11-01 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781751#comment-17781751
 ] 

Arpit Goyal edited comment on KAFKA-15388 at 11/2/23 5:04 AM:
--

[~divijvaidya] I am not getting the archival functionality , From archival you 
mean once the segments has been expired because of retention mechanism ?

Do you mean If we takes care of the endoffset value while reading it from the 
remote storage i.e. read offset from the next segment base offset instead of 
manipulating using endoffset. I found two usages where we are using it 
1. One while  updating the logStartOffset  during cleanup of the log segment 
based on retention 
https://github.com/apache/kafka/blob/5785796f985aa294c12e670da221d086a7fa887c/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L827
2. During read path of the remote storage 
https://github.com/apache/kafka/blob/5785796f985aa294c12e670da221d086a7fa887c/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1263
We need to correct at both places ?



was (Author: JIRAUSER301926):
[~divijvaidya] I am not getting the archival functionality , From archival you 
mean copying segments would not be impacted. 

Do you mean If we takes care of the endoffset value while reading it from the 
remote storage i.e. read offset from the next segment base offset instead of 
manipulating using endoffset. I found two usages where we are using it 
1. One while  updating the logStartOffset  during cleanup of the log segment 
based on retention 
https://github.com/apache/kafka/blob/5785796f985aa294c12e670da221d086a7fa887c/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L827
2. During read path of the remote storage 
https://github.com/apache/kafka/blob/5785796f985aa294c12e670da221d086a7fa887c/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1263
We need to correct at both places ?


> Handle topics that were having compaction as retention earlier are changed to 
> delete only retention policy and onboarded to tiered storage. 
> 
>
> Key: KAFKA-15388
> URL: https://issues.apache.org/jira/browse/KAFKA-15388
> Project: Kafka
>  Issue Type: Bug
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Blocker
> Fix For: 3.7.0
>
>
> Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517]
>  
> There are 3 paths I looked at:
>  * When data is moved to remote storage (1)
>  * When data is read from remote storage (2)
>  * When data is deleted from remote storage (3)
> (1) Does not have a problem with compacted topics. Compacted segments are 
> uploaded and their metadata claims they contain offset from the baseOffset of 
> the segment until the next segment's baseOffset. There are no gaps in offsets.
> (2) Does not have a problem if a customer is querying offsets which do not 
> exist within a segment, but there are offset after the queried offset within 
> the same segment. *However, it does have a problem when the next available 
> offset is in a subsequent segment.*
> (3) For data deleted via DeleteRecords there is no problem. For data deleted 
> via retention there is no problem.
>  
> *I believe the proper solution to (2) is to make tiered storage continue 
> looking for the next greater offset in subsequent segments.*
> Steps to reproduce the issue:
> {code:java}
> // TODO (christo)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1379595660


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class represents a telemetry metric that does not yet contain resource 
tags.
+ * These additional resource tags will be added before emitting metrics by the 
telemetry reporter.
+ */
+public class SinglePointMetric implements MetricKeyable {
+
+private final MetricKey key;
+private final Metric.Builder metricBuilder;
+
+private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
+this.key = key;
+this.metricBuilder = metricBuilder;
+}
+
+@Override
+public MetricKey key() {
+return key;
+}
+
+public Metric.Builder metric() {
+return metricBuilder;
+}
+
+public static SinglePointMetric create(MetricKey metricKey, Metric.Builder 
metric) {
+return new SinglePointMetric(metricKey, metric);
+}
+
+/*
+Methods to construct gauge metric type.
+ */
+public static SinglePointMetric gauge(MetricKey metricKey, Number value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+public static SinglePointMetric gauge(MetricKey metricKey, double value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+/*
+Methods to construct sum metric type.
+ */
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp) {
+return sum(metricKey, value, monotonic, timestamp, null);
+}
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp,
+Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+if (startTimestamp != null) {
+point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+}
+
+return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point);
+}
+
+public static SinglePointMetric deltaSum(MetricKey metricKey, double 
value, boolean monotonic,
+Instant timestamp, Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value)

Review Comment:
   Agree to Apoorv. We should have check for anything "public" but not 
necessarily _everywhere_.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1379594061


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A LastValueTracker uses a ConcurrentMap to maintain historic values for a 
given key, and return
+ * a previous value and an Instant for that value.
+ *
+ * @param  The type of the value.
+ */
+public class LastValueTracker {
+private final ConcurrentMap>> counters = new ConcurrentHashMap<>();
+
+/**
+ * Return the last instant/value for the given MetricKey, or 
Optional.empty if there isn't one.
+ *
+ * @param metricKey the key for which to calculate a getAndSet.
+ * @param now the timestamp for the new value.
+ * @param value the current value.
+ * @return the timestamp of the previous entry and its value. If there
+ * isn't a previous entry, then this method returns {@link 
Optional#empty()}
+ */
+public Optional> getAndSet(MetricKey metricKey, Instant 
now, T value) {
+InstantAndValue instantAndValue = new InstantAndValue<>(now, value);
+AtomicReference> valueOrNull = counters

Review Comment:
   Still not sure if I can follow -- I don't see any code that would return the 
value of the `counters` map (only the `remove` but given that the map is thread 
safe, it's not possible that we would return it to two threads -- a second 
thread would get a `null` back)? And the put is protected by the concurrent 
map. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1379594061


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A LastValueTracker uses a ConcurrentMap to maintain historic values for a 
given key, and return
+ * a previous value and an Instant for that value.
+ *
+ * @param  The type of the value.
+ */
+public class LastValueTracker {
+private final ConcurrentMap>> counters = new ConcurrentHashMap<>();
+
+/**
+ * Return the last instant/value for the given MetricKey, or 
Optional.empty if there isn't one.
+ *
+ * @param metricKey the key for which to calculate a getAndSet.
+ * @param now the timestamp for the new value.
+ * @param value the current value.
+ * @return the timestamp of the previous entry and its value. If there
+ * isn't a previous entry, then this method returns {@link 
Optional#empty()}
+ */
+public Optional> getAndSet(MetricKey metricKey, Instant 
now, T value) {
+InstantAndValue instantAndValue = new InstantAndValue<>(now, value);
+AtomicReference> valueOrNull = counters

Review Comment:
   Still not sure if I can follow -- I don't see any code that would return the 
value of the `counters` map? And the put is protected by the concurrent map. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1374059672


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *

[jira] [Resolved] (KAFKA-15669) Implement telemetry naming strategy

2023-11-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15669.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Implement telemetry naming strategy
> ---
>
> Key: KAFKA-15669
> URL: https://issues.apache.org/jira/browse/KAFKA-15669
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.7.0
>
>
> Define classes and implement telemetry metrics naming strategy for the 
> KIP-714 as defined here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat]
>  
> The naming strategy must also support delta temporality metrics with a suffix 
> in original metric name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]

2023-11-01 Thread via GitHub


mjsax commented on PR #14618:
URL: https://github.com/apache/kafka/pull/14618#issuecomment-1790065635

   Thanks for all the review help @xvrl!
   
   Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15668) Add Opentelemetry Proto library with shadowed classes

2023-11-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15668.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Add Opentelemetry Proto library with shadowed classes
> -
>
> Key: KAFKA-15668
> URL: https://issues.apache.org/jira/browse/KAFKA-15668
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.7.0
>
>
> The KIP-714 requires addition of [Java client 
> dependency|https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Javaclientdependencies]
>  of {{{}opentelemetry-proto{}}}, also brings transitive dependency of 
> {{protobuf-java.}} The dependencies should be shadowed to avoid JVM 
> versioning conflicts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]

2023-11-01 Thread via GitHub


mjsax merged PR #14618:
URL: https://github.com/apache/kafka/pull/14618


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14618:
URL: https://github.com/apache/kafka/pull/14618#discussion_r1379585726


##
build.gradle:
##
@@ -295,7 +296,12 @@ subprojects {
   }
   publications {
 mavenJava(MavenPublication) {
-  from components.java
+  if (!shouldPublishWithShadow) {
+from components.java
+  } else {
+apply plugin: 'com.github.johnrengelman.shadow'

Review Comment:
   Thanks. Yes, I understood already what the plugin does.
   
   > This dependency is not introduced in this PR rather being already used in 
Kafka for a while:
   
   This was my concern, but if it's already established my concerns are void.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: [Minor] Renaming getter and correcting code comments [kafka]

2023-11-01 Thread via GitHub


mjsax commented on PR #14684:
URL: https://github.com/apache/kafka/pull/14684#issuecomment-1790059069

   Thanks for the follow up. Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: [Minor] Renaming getter and correcting code comments [kafka]

2023-11-01 Thread via GitHub


mjsax merged PR #14684:
URL: https://github.com/apache/kafka/pull/14684


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14957: Update-Description-String [kafka]

2023-11-01 Thread via GitHub


mjsax commented on PR #13909:
URL: https://github.com/apache/kafka/pull/13909#issuecomment-1790049927

   Seems I missed you last comment -- sorry about this. The bot comment just 
surfaced to me.
   
   > May I ask how do we want to change the default value? I mean, if we are to 
provide something custom instead of using the default value. What would be that 
custom thingy?
   
   Note, we only want a way to overwrite what we put into the docs, when 
generating them. Right now we call `config(configName)` and it give us the 
default value (which might depend on the machine we are running on).
   
   For the html generation part, we want something like `html(configName)` 
which would be something like:
   ```
   if (htmlOverwrite(configName) != null) {
 return htmlOverwrite(configName);
   } else {
 return config(configName);
   }
   ```
   
   For most configs not html overwrite existing, but for `state.dir` config we 
would set it to somethin meaningful that makes sense in the docs, and we need 
some code to actually add a optional html overwrite when we define a config:
   ```
   CONFIG = new ConfigDef()
   .define(STATE_DIR_CONFIG,
   Type.STRING,
   System.getProperty("java.io.tmpdir") + File.separator + 
"kafka-streams",
   "${java.io.tmpdir}", -- this is the new line to pass in 
html overwrite
   Importance.HIGH,
   STATE_DIR_DOC)
   ```
   
   Does this help?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KIP-909 [kafka]

2023-11-01 Thread via GitHub


philipnee opened a new pull request, #14691:
URL: https://github.com/apache/kafka/pull/14691

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15765) Remove task level metric "commit-latency"

2023-11-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15765:
---

Assignee: Bruno Cadonna

> Remove task level metric "commit-latency"
> -
>
> Key: KAFKA-15765
> URL: https://issues.apache.org/jira/browse/KAFKA-15765
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 4.0.0
>
>
> We stopped tracking this metric with KIP-447, but kept it for backward 
> compatibility reasons. It's time to remove it completely with 4.0 release.
> Cf 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics]
> And [https://github.com/apache/kafka/pull/8218/files#r390712211]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14681:
URL: https://github.com/apache/kafka/pull/14681#discussion_r1379562125


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   > protected is public API for classes where inheritance is allowed.
   
   While technically correct, I don't think that `ProducerConfig` is a class 
that is _intended_ to be extended (and we cannot make it final because we want 
to extend it internally). -- As a matter of fact, `ProducerConfig` should not 
even be instantiated by a user, but it's only public to give access to the 
config definitions which are all `public static final String ...` -- all other 
methods it has (`configName()` etc) are also just public for practical reason 
and should actually be hidden from the user -- it's a very leaky abstraction 
from an API POV -- we actually have a similar issue for `StreamsConfig`; even 
worse for this case; for which I consider doing a KIP to clean it up: 
https://github.com/apache/kafka/pull/14548)
   
   > Pointing to a commit where you did something doesn't quite provide 
evidence that it's following the process. ;)
   
   Sure, but I wanted to point out that many people did consider this ok in the 
past, so why do we think it was not ok...?
   
   If you are not going to revert this change if we merge it, I would just move 
forward with this PR as-is, not doing a KIP (as it's highly questionable)... If 
you insist on a KIP, Sophie please just write one and skip the discussion so we 
can VOTE it directly -- but it seems totally unnecessary work to me)
   
   And if you know me, I am _always_ in favor of doing things the right way if 
there is the slightest reason for it -- but for this case, I cannot see any 
reason for a KIP at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14681:
URL: https://github.com/apache/kafka/pull/14681#discussion_r1379562125


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   > protected is public API for classes where inheritance is allowed.
   
   While technically correct, I don't think that `ProducerConfig` is a class 
that is _intended_ to be extended (and we cannot make it final because we want 
to extend it internally). -- As a matter of fact, `ProducerConfig` should not 
even be instantiated by a user, but it's only public to give access to the 
config definitions which are all `public static final String ...` -- all other 
methods it has (`configName()` etc) are also just public for practical reason 
and should actually be hidden from the user -- it's a very leaky abstraction 
from an API POV -- we actually have a similar issue for `StreamsConfig`; even 
worse for this case; for which I consider doing a KIP to clean it up: 
https://github.com/apache/kafka/pull/14548)
   
   > Pointing to a commit where you did something doesn't quite provide 
evidence that it's following the process. ;)
   
   Sure, but I wanted to point out that many people did consider this ok in the 
past, so why do we think it was not ok...?
   
   If you are not going to revert this change if we merge it, I would just move 
forward with this PR as-is, not doing a KIP (as it's highly questionable)... If 
you insist on a KIP, Sophie please just write one and skip the discussion so we 
can VOTE it directly -- but it seems totally unnecessary work to me)
   
   And if you know me, I am _always_ in favor of doing thing the right way if 
there is the slightest reason for it -- but for this case, I cannot see any 
reason for a KIP at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14681:
URL: https://github.com/apache/kafka/pull/14681#discussion_r1379562125


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   > protected is public API for classes where inheritance is allowed.
   
   While technically correct, I don't think that `ProducerConfig` is a class 
that is _intended_ to be extended (and we cannot make it final because we want 
to extend it internally). -- As a matter of fact, `ProducerConfig` should not 
even be instantiated by a user, but it's only public to give access to the 
config definitions which are all `public static final String ...` -- all other 
methods it has (`configName()` etc) are also just public for practical reason 
and should actually be hidden from the user -- it's a very leaky abstraction 
from an API POV -- we actually have a similar issue for `StreamsConfig`; even 
worse for this case; for which I consider doing a KIP to clean it up: 
https://github.com/apache/kafka/pull/14548)
   
   > Pointing to a commit where you did something doesn't quite provide 
evidence that it's following the process. ;)
   
   Sure, but I wanted to point out that many people did consider this ok in the 
past, so why do we think it was not ok...?
   
   If you are not going to revert this change if we merge it, I would just move 
forward with this PR as-is, not doing a KIP (as it's highly questionable)... If 
you insist on a KIP, Sophie please just write one and skip the discussion so we 
can VOTE it directly -- but it seems totally unnecessary work to me)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1379530923


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##
@@ -127,14 +131,31 @@ public  QueryResult query(
 final QueryConfig config) {
 
 
+
 final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-final QueryResult result = store.query(query, positionBound, 
config);
+QueryResult result = store.query(query, positionBound, config);
+final Position position = result.getPosition();
+
+if (result.isSuccess()) {
+if (result.getResult() instanceof byte[]) {
+final byte[] plainValue = (byte[]) result.getResult();
+final byte[] valueWithTimestamp = 
convertToTimestampedFormat(plainValue);
+result = (QueryResult) 
QueryResult.forResult(valueWithTimestamp);

Review Comment:
   Should we use `InternalQueryResultUtil.copyAndSubstituteDeserializedResult` 
instead, to create a copy of the result correctly?



##
streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyQuery.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.util.Objects;
+
+/**
+ * Interactive query for retrieving a single record based on its key.
+ */
+@Evolving
+public final class TimestampedKeyQuery implements 
Query> {
+
+private final K key;
+private final boolean skipCache;
+
+private TimestampedKeyQuery(final K key, final boolean skipCache) {
+this.key = Objects.requireNonNull(key);

Review Comment:
   I think this check should go into `withKey()` -- we should also add an error 
message
   
   (Might be good to clean this up, inside `KeyQuery`, too)



##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##
@@ -180,5 +201,38 @@ public , P> 
KeyValueIterator prefixScan(
 public long approximateNumEntries() {
 return store.approximateNumEntries();
 }
+}
 
+class WrappedRocksDbIterator extends AbstractIterator implements 
ManagedKeyValueIterator {

Review Comment:
   Wondering if we should only `implements ManagedKeyValueIterator` 🤔 -- what do we gain/need from `AbstractIterator`?
   
   We should for sure set the correct types:
   ```
   ... extends AbstractIterator> implements 
ManagedKeyValueIterator
   ```
   to fix the `Object` return type on some methods.
   
   
   
   



##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##
@@ -180,5 +201,38 @@ public , P> 
KeyValueIterator prefixScan(
 public long approximateNumEntries() {
 return store.approximateNumEntries();
 }
+}
 
+class WrappedRocksDbIterator extends AbstractIterator implements 
ManagedKeyValueIterator {

Review Comment:
   Wondering if we should only `implements ManagedKeyValueIterator` 🤔 -- what do we gain/need from `AbstractIterator`?
   
   We should for sure set the correct types:
   ```
   ... extends AbstractIterator> implements 
ManagedKeyValueIterator
   ```
   to fix the `Object` return type on some methods.
   
   
   
   



##
streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyQuery.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+

Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-01 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1379554731


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##
@@ -127,14 +131,31 @@ public  QueryResult query(
 final QueryConfig config) {
 
 
+
 final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-final QueryResult result = store.query(query, positionBound, 
config);
+QueryResult result = store.query(query, positionBound, config);
+final Position position = result.getPosition();
+
+if (result.isSuccess()) {
+if (result.getResult() instanceof byte[]) {
+final byte[] plainValue = (byte[]) result.getResult();
+final byte[] valueWithTimestamp = 
convertToTimestampedFormat(plainValue);
+result = (QueryResult) 
QueryResult.forResult(valueWithTimestamp);
+}
+if (result.getResult() instanceof RocksDbIterator) {
+final WrappedRocksDbIterator wrappedRocksDBRangeIterator = new 
WrappedRocksDbIterator((RocksDbIterator) result.getResult());
+result = (QueryResult) 
QueryResult.forResult(wrappedRocksDBRangeIterator);
+}

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-01 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1379550244


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##
@@ -127,14 +131,31 @@ public  QueryResult query(
 final QueryConfig config) {
 
 
+
 final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-final QueryResult result = store.query(query, positionBound, 
config);
+QueryResult result = store.query(query, positionBound, config);
+final Position position = result.getPosition();
+
+if (result.isSuccess()) {
+if (result.getResult() instanceof byte[]) {
+final byte[] plainValue = (byte[]) result.getResult();
+final byte[] valueWithTimestamp = 
convertToTimestampedFormat(plainValue);
+result = (QueryResult) 
QueryResult.forResult(valueWithTimestamp);
+}
+if (result.getResult() instanceof RocksDbIterator) {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-01 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1379550064


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##
@@ -127,14 +131,31 @@ public  QueryResult query(
 final QueryConfig config) {
 
 
+
 final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-final QueryResult result = store.query(query, positionBound, 
config);
+QueryResult result = store.query(query, positionBound, config);
+final Position position = result.getPosition();
+
+if (result.isSuccess()) {
+if (result.getResult() instanceof byte[]) {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]

2023-11-01 Thread via GitHub


ijuma commented on code in PR #14681:
URL: https://github.com/apache/kafka/pull/14681#discussion_r1379548300


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   Private methods are not part of the API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-11-01 Thread via GitHub


philipnee commented on PR #14639:
URL: https://github.com/apache/kafka/pull/14639#issuecomment-1789989071

   Test failures seem irrelevant, but retriggering the test as there's a failed 
build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-01 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1378442872


##
streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+
+import java.util.Optional;
+
+/**
+ * Interactive query for issuing range queries and scans over KeyValue stores.
+ * 
+ *  A range query retrieves a set of records, specified using an upper and/or 
lower bound on the keys.
+ * 
+ * A scan query retrieves all records contained in the store.
+ * 

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1367565896


##
streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+
+import java.util.Optional;
+
+/**
+ * Interactive query for issuing range queries and scans over KeyValue stores.
+ * 
+ *  A range query retrieves a set of records, specified using an upper and/or 
lower bound on the keys.
+ * 
+ * A scan query retrieves all records contained in the store.
+ * 

Review Comment:
   Unnecessary tag. -- Also generic parameters should be describe -- we should 
also describe ordering guarantees better (we can wait for your other PR to get 
finished, and mimic what we did there here.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15110:Fix the fail start due to the existence of multiple version packages under libs [kafka]

2023-11-01 Thread via GitHub


hudeqi closed pull request #13893: KAFKA-15110:Fix the fail start due to the 
existence of multiple version packages under libs
URL: https://github.com/apache/kafka/pull/13893


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15129;[9/N] Remove metrics in network package when broker shutdown [kafka]

2023-11-01 Thread via GitHub


hudeqi closed pull request #14043: KAFKA-15129;[9/N] Remove metrics in network 
package when broker shutdown
URL: https://github.com/apache/kafka/pull/14043


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15129;[6/N] Remove metrics in ControllerStats when broker shutdown [kafka]

2023-11-01 Thread via GitHub


hudeqi closed pull request #13960: KAFKA-15129;[6/N] Remove metrics in 
ControllerStats when broker shutdown
URL: https://github.com/apache/kafka/pull/13960


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15129;[8/N] Remove metrics in KafkaRequestHandlerPool when broker shutdown [kafka]

2023-11-01 Thread via GitHub


hudeqi closed pull request #14015: KAFKA-15129;[8/N] Remove metrics in 
KafkaRequestHandlerPool when broker shutdown
URL: https://github.com/apache/kafka/pull/14015


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15129;[10/N] Remove metrics in log when broker shutdown [kafka]

2023-11-01 Thread via GitHub


hudeqi closed pull request #14058: KAFKA-15129;[10/N] Remove metrics in log 
when broker shutdown
URL: https://github.com/apache/kafka/pull/14058


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15129;[5/N] Remove metrics in ControllerChannelManager when broker shutdown [kafka]

2023-11-01 Thread via GitHub


hudeqi closed pull request #13959: KAFKA-15129;[5/N] Remove metrics in 
ControllerChannelManager when broker shutdown
URL: https://github.com/apache/kafka/pull/13959


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15129;[4/N] Remove metrics in Partition when broker shutdown and stopPartitions [kafka]

2023-11-01 Thread via GitHub


hudeqi closed pull request #13952: KAFKA-15129;[4/N] Remove metrics in 
Partition when broker shutdown and stopPartitions
URL: https://github.com/apache/kafka/pull/13952


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15129;[3/N] Remove metrics in AbstractFetcherManager when fetcher manager instance shutdown [kafka]

2023-11-01 Thread via GitHub


hudeqi closed pull request #13929: KAFKA-15129;[3/N] Remove metrics in 
AbstractFetcherManager when fetcher manager instance shutdown
URL: https://github.com/apache/kafka/pull/13929


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown [kafka]

2023-11-01 Thread via GitHub


hudeqi closed pull request #13926: KAFKA-15129;[2/N] Remove metrics in 
GroupMetadataManager when shutdown
URL: https://github.com/apache/kafka/pull/13926


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1379524385


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##
@@ -127,14 +131,31 @@ public  QueryResult query(
 final QueryConfig config) {
 
 
+
 final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-final QueryResult result = store.query(query, positionBound, 
config);
+QueryResult result = store.query(query, positionBound, config);
+final Position position = result.getPosition();
+
+if (result.isSuccess()) {
+if (result.getResult() instanceof byte[]) {
+final byte[] plainValue = (byte[]) result.getResult();
+final byte[] valueWithTimestamp = 
convertToTimestampedFormat(plainValue);
+result = (QueryResult) 
QueryResult.forResult(valueWithTimestamp);
+}
+if (result.getResult() instanceof RocksDbIterator) {
+final WrappedRocksDbIterator wrappedRocksDBRangeIterator = new 
WrappedRocksDbIterator((RocksDbIterator) result.getResult());
+result = (QueryResult) 
QueryResult.forResult(wrappedRocksDBRangeIterator);
+}

Review Comment:
   We should chain this with an `else` that would throw an exception (we should 
never hit the `else` and it would be a bug -- throwing an exception would 
surface the bug clearly).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1379523818


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##
@@ -127,14 +131,31 @@ public  QueryResult query(
 final QueryConfig config) {
 
 
+
 final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-final QueryResult result = store.query(query, positionBound, 
config);
+QueryResult result = store.query(query, positionBound, config);
+final Position position = result.getPosition();
+
+if (result.isSuccess()) {
+if (result.getResult() instanceof byte[]) {
+final byte[] plainValue = (byte[]) result.getResult();
+final byte[] valueWithTimestamp = 
convertToTimestampedFormat(plainValue);
+result = (QueryResult) 
QueryResult.forResult(valueWithTimestamp);
+}
+if (result.getResult() instanceof RocksDbIterator) {

Review Comment:
   we should chain with with an `else` to the previous `if`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1379523619


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##
@@ -127,14 +131,31 @@ public  QueryResult query(
 final QueryConfig config) {
 
 
+
 final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-final QueryResult result = store.query(query, positionBound, 
config);
+QueryResult result = store.query(query, positionBound, config);
+final Position position = result.getPosition();
+
+if (result.isSuccess()) {
+if (result.getResult() instanceof byte[]) {

Review Comment:
   I think we should actually check the input `query` type instead:
   ```
   if (query instanceof KeyQuery || query instanceof TimestampedKeyQuery)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1379511059


##
streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java:
##
@@ -27,6 +27,7 @@
 public final class VersionedRecord {
 private final V value;
 private final long timestamp;
+private final long validTo;

Review Comment:
   Btw: given that `VersionedKeyQuery` also returns a `VersionedRecord`, I 
believe we need to update it to set this timestamp correctly for historic 
queried? If we query "latest" there is nothing to be done, but if we query an 
older version based on an `asOfTimestamp` we might need to take care of this 
case?
   
   Might also be worth mentioning this on the KIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15432:RLM Stop partitions should not be invoked for non-tiered storage topics [kafka]

2023-11-01 Thread via GitHub


hudeqi commented on PR #14667:
URL: https://github.com/apache/kafka/pull/14667#issuecomment-1789956368

   > > I am restarting the CI. There are large number of test failures 
including some TS ones. Let's wait to have a saner CI build.
   > 
   > seems new CI failed tests are unrelated. @divijvaidya
   
   The only failed test about TS: 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest is 
successfully in local env.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15602: revert KAFKA-4852 [kafka]

2023-11-01 Thread via GitHub


mjsax commented on PR #14617:
URL: https://github.com/apache/kafka/pull/14617#issuecomment-1789953849

   Merged to `trunk` and cherry-picked to `3.6`, `3.5`, and `3.4` branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1379469860


##
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */

Review Comment:
   add `@param` for key and value types



##
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery implements 
Query>> {
+
+private final K key;
+private final Optional fromTime;
+private final Optional toTime;
+private final boolean isAscending;
+
+private MultiVersionedKeyQuery(final K key, final Optional 
fromTime, final Optional toTime, final boolean isAscending) {
+this.key = Objects.requireNonNull(key);
+this.fromTime = fromTime;
+this.toTime = toTime;
+this.isAscending = isAscending;
+}
+
+  /**
+   * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+   * (or {@code null} otherwise).

Review Comment:
   Would we ever return `null` -- I think the iterator would just be empty but 
never be `null`?
   
   Can we extend the JavaDocs similar to `VersionedKeyQuery` and talk about 
default (query full history) vs optionally limit the time range scope?



##
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.an

Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]

2023-11-01 Thread via GitHub


ableegoldman commented on code in PR #14681:
URL: https://github.com/apache/kafka/pull/14681#discussion_r1379493839


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   Interesting, I hadn't heard this definition...I suppose it makes sense in 
general, but I would actually argue that this specific scenario demonstrates an 
edge case where that definition breaks down. Wouldn't this also mean that we 
cannot even add new private constructors to a public class, just because they 
would show up in the javadocs? That doesn't sound right...I can see how one 
could argue that package-private and protected methods are considered part of 
the public API and should not be changed in a backwards incompatible way. But 
private methods have javadocs that would change, yet they are definitely not 
part of the public API. 
   
   (Just curious about this, since I've often wondered if we even have a 
specific policy definitely what is/isn't strictly speaking considered a public 
API.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]

2023-11-01 Thread via GitHub


ijuma commented on code in PR #14681:
URL: https://github.com/apache/kafka/pull/14681#discussion_r1379486496


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   Pointing to a commit where you did something doesn't quite provide evidence 
that it's following the process. ;)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]

2023-11-01 Thread via GitHub


ijuma commented on code in PR #14681:
URL: https://github.com/apache/kafka/pull/14681#discussion_r1379485880


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   protected is public API for classes where inheritance is allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14596:
URL: https://github.com/apache/kafka/pull/14596#discussion_r1379482103


##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##
@@ -157,6 +213,30 @@ protected  QueryResult runKeyQuery(final Query 
query,
 throw new UnsupportedOperationException("Versioned stores do not 
support KeyQuery queries at this time.");
 }
 
+@SuppressWarnings("unchecked")
+protected  QueryResult runVersionedKeyQuery(final Query query,

Review Comment:
   Why do we add this as `protected` ? Should be `private` IMHO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14596:
URL: https://github.com/apache/kafka/pull/14596#discussion_r1379471204


##
streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a single record from a versioned state 
store based on its key and timestamp.
+ *
+ * @param  The type of the key.
+ * @param  The type of the value.
+ */
+@Evolving
+public final class VersionedKeyQuery implements 
Query> {
+
+private final K key;
+private final Optional asOfTimestamp;
+
+private VersionedKeyQuery(final K key, final Optional 
asOfTimestamp) {
+this.key = Objects.requireNonNull(key);

Review Comment:
   This null check should go into `withKey(...)` (to get flatter stack traces). 
We should also add an error message: `Objects.requireNonNull(key, "key cannot 
be null");`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15730) KRaft support in ProducerFailureHandlingTest

2023-11-01 Thread Sameer Tejani (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sameer Tejani resolved KAFKA-15730.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> KRaft support in ProducerFailureHandlingTest
> 
>
> Key: KAFKA-15730
> URL: https://issues.apache.org/jira/browse/KAFKA-15730
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
> Fix For: 3.7.0
>
>
> The following tests in ProducerFailureHandlingTest in 
> core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> need to be updated to support KRaft
> 87 : def testTooLargeRecordWithAckZero(): Unit = {
> 104 : def testTooLargeRecordWithAckOne(): Unit = {
> 133 : def testPartitionTooLargeForReplicationWithAckAll(): Unit = {
> 139 : def testResponseTooLargeForReplicationWithAckAll(): Unit = {
> 147 : def testNonExistentTopic(): Unit = {
> 164 : def testWrongBrokerList(): Unit = {
> 181 : def testInvalidPartition(): Unit = {
> 195 : def testSendAfterClosed(): Unit = {
> 215 : def testCannotSendToInternalTopic(): Unit = {
> 223 : def testNotEnoughReplicas(): Unit = {
> 236 : def testNotEnoughReplicasAfterBrokerShutdown(): Unit = {
> Scanned 260 lines. Found 0 KRaft tests out of 11 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15731) KRaft support in ListOffsetsIntegrationTest

2023-11-01 Thread Sameer Tejani (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sameer Tejani resolved KAFKA-15731.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> KRaft support in ListOffsetsIntegrationTest
> ---
>
> Key: KAFKA-15731
> URL: https://issues.apache.org/jira/browse/KAFKA-15731
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
> Fix For: 3.7.0
>
>
> The following tests in ListOffsetsIntegrationTest in 
> core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala 
> need to be updated to support KRaft
> 55 : def testEarliestOffset(): Unit = {
> 61 : def testLatestOffset(): Unit = {
> 67 : def testMaxTimestampOffset(): Unit = {
> Scanned 96 lines. Found 0 KRaft tests out of 3 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15732) KRaft support in LogAppendTimeTest

2023-11-01 Thread Sameer Tejani (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sameer Tejani resolved KAFKA-15732.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> KRaft support in LogAppendTimeTest
> --
>
> Key: KAFKA-15732
> URL: https://issues.apache.org/jira/browse/KAFKA-15732
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
> Fix For: 3.7.0
>
>
> The following tests in LogAppendTimeTest in 
> core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala need to be 
> updated to support KRaft
> 50 : def testProduceConsume(): Unit = {
> Scanned 78 lines. Found 0 KRaft tests out of 1 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15733) KRaft support in FetchRequestMaxBytesTest

2023-11-01 Thread Sameer Tejani (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sameer Tejani resolved KAFKA-15733.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> KRaft support in FetchRequestMaxBytesTest
> -
>
> Key: KAFKA-15733
> URL: https://issues.apache.org/jira/browse/KAFKA-15733
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
> Fix For: 3.7.0
>
>
> The following tests in FetchRequestMaxBytesTest in 
> core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala need to 
> be updated to support KRaft
> 105 : def testConsumeMultipleRecords(): Unit = {
> Scanned 134 lines. Found 0 KRaft tests out of 1 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15753) KRaft support in BrokerApiVersionsCommandTest

2023-11-01 Thread Sameer Tejani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781948#comment-17781948
 ] 

Sameer Tejani commented on KAFKA-15753:
---

[~linzihao1999] if these issues are simple enough (if you look at other PRs 
that have enabled KRaft mode by using `kafka.utils.TestInfoUtils`) then let's 
go ahead and implement them now so that we can get more coverage on KRaft.

> KRaft support in BrokerApiVersionsCommandTest
> -
>
> Key: KAFKA-15753
> URL: https://issues.apache.org/jira/browse/KAFKA-15753
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in BrokerApiVersionsCommandTest in 
> core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
>  need to be updated to support KRaft
> 50 : def checkBrokerApiVersionCommandOutput(): Unit = {
> Scanned 80 lines. Found 0 KRaft tests out of 1 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14419) Failed SyncGroup leading to partitions lost due to processing during rebalances

2023-11-01 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781946#comment-17781946
 ] 

Matthias J. Sax commented on KAFKA-14419:
-

[~kirktrue] [~pnee] – will the new consumer threading help with this? Maybe 
it's not something we should try to fix at the Streams layer to begin with?

> Failed SyncGroup leading to partitions lost due to processing during 
> rebalances
> ---
>
> Key: KAFKA-14419
> URL: https://issues.apache.org/jira/browse/KAFKA-14419
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Four Kafka client application instances on separate EC2 instances with a 
> total of 8 active and 8 standby stream tasks for the same stream topology, 
> consuming from an input topic with 8 partitions. Sometimes a handful of 
> messages are consumed twice by one of the stream tasks when stream tasks on 
> another application instance join the consumer group after an application 
> instance restart.
> Additional information:
> Messages are produced to the topic by another Kafka streams topology deployed 
> on the same four application instances. I have verified that each message is 
> only produced once by enabling debug logging in the topology flow right 
> before producing each message to the topic.
> Logs from stream thread with duplicate consumption:
>  
> {code:java}
> 2022-11-21 15:09:33,677 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is 
> already rebalancing
> 2022-11-21 15:09:33,677 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] (Re-)joining group
> Input records consumed for the first time
> 2022-11-21 15:09:33,919 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Successfully joined group with 
> generation Generation{generationId=8017, 
> memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
>  protocol='stream'}
> 2022-11-21 15:09:33,920 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began 
> another rebalance. Need to re-join the group. Sent generation was 
> Generation{generationId=8017, 
> memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
>  protocol='stream'}
> 2022-11-21 15:09:33,922 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: 
> encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
> 2022-11-21 15:09:33,922 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Request joining group due to: 
> encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
> 2022-11-21 15:09:33,923 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:819] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Giving away all assigned partitions as 
> lost since generation/me

Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14681:
URL: https://github.com/apache/kafka/pull/14681#discussion_r1379448271


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   We did a similar change w/o a KIP in the past 
(https://github.com/apache/kafka/commit/487b95454233e981a65c184c7f3b86adf34058ef)
   
   Actually similar for `ConsumerConfig` 
(https://github.com/apache/kafka/commit/283a19481d1ce4a77f5f465e7b96288db22a8ff1)
 -- note that while this PR is part of a KIP, the KIP is about something 
totally different and does not mention this change at all.
   
   I agree it's borderline, but believe it's ok. -- In the end, I would rather 
exclude `protected` stuff from the JavaDocs as I don't think `protected` is 
actually public API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]

2023-11-01 Thread via GitHub


mjsax commented on code in PR #14681:
URL: https://github.com/apache/kafka/pull/14681#discussion_r1379448271


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   We did a similar change w/o a KIP in the past 
(https://github.com/apache/kafka/commit/487b95454233e981a65c184c7f3b86adf34058ef)
   
   I agree it's borderline, but believe it's ok. -- In the end, I would rather 
exclude `protected` stuff from the JavaDocs as I don't think `protected` is 
actually public API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-11-01 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781935#comment-17781935
 ] 

Matthias J. Sax commented on KAFKA-15602:
-

[~luke.kirby] – it seems unlikely that there will be a 3.4.2 bug-fix release – 
I am actually surprised that we do a 3.5.2... Historically, we focus on bug-fix 
releases for the current release minor/major version (ie, currently 3.6) only.

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Assignee: Matthias J. Sax
>Priority: Critical
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limit to the current position so that 
> whoever reads the buffer knows when to stop reading, and sets the position to 
> zero so it knows where to start reading from */ 
> producer.send(bb); {co

Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]

2023-11-01 Thread via GitHub


apoorvmittal10 commented on PR #14618:
URL: https://github.com/apache/kafka/pull/14618#issuecomment-1789838514

   @wcarlson5 @mjsax 16 tests unrelated to the PR are failing. As we have +1 
from @xvrl hence can we please merge this PR now. Thanks for the help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2023-11-01 Thread via GitHub


jolshan commented on PR #14489:
URL: https://github.com/apache/kafka/pull/14489#issuecomment-1789834583

   Newest test failures look unrelated: 
   [Build / JDK 17 and Scala 2.13 / 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testDescribeTokenForOtherUserFails(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14489/5/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationWithOwnerTest/Build___JDK_17_and_Scala_2_13___testDescribeTokenForOtherUserFails_String__quorum_kraft/)
   [Build / JDK 11 and Scala 2.13 / 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14489/5/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testRackAwareRangeAssignor_String__quorum_zk/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14489/5/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_11_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/)
   [Build / JDK 21 and Scala 2.13 / 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14489/5/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_21_and_Scala_2_13___testRackAwareRangeAssignor_String__quorum_zk/)
   [Build / JDK 21 and Scala 2.13 / 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithDescribeAclViaSubscribe(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14489/5/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationWithOwnerTest/Build___JDK_21_and_Scala_2_13___testNoConsumeWithDescribeAclViaSubscribe_String__quorum_kraft/)
   [Build / JDK 21 and Scala 2.13 / 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testDescribeTokenForOtherUserPasses(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14489/5/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationWithOwnerTest/Build___JDK_21_and_Scala_2_13___testDescribeTokenForOtherUserPasses_String__quorum_kraft/)
   [Build / JDK 21 and Scala 2.13 / 
kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestExcludingClusterAuthorizedOperations(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14489/5/testReport/junit/kafka.server/DescribeClusterRequestTest/Build___JDK_21_and_Scala_2_13___testDescribeClusterRequestExcludingClusterAuthorizedOperations_String__quorum_kraft/)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-11-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1379428164


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
+ */
+public class ClientMetricsManager implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
+private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+
+public static ClientMetricsManager getInstance() {

Review Comment:
   Done for both `getInstance` and `getDefaultProperties`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-01 Thread via GitHub


kirktrue commented on PR #14690:
URL: https://github.com/apache/kafka/pull/14690#issuecomment-1789830091

   BTW, although it's hard for a someone of really little brain like myself to 
follow at times, the chained `Future` mechanism is very interesting and useful 
here. My one concern is that we don't allow those `Future`s to be completed by 
the application thread, as that would then kick off all the chained logic in 
the application thread. That would be bad 😆


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-11-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1379426628


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2619,6 +2619,9 @@ private ConfigEntry.ConfigSource 
configSource(DescribeConfigsResponse.ConfigSour
 case DYNAMIC_BROKER_LOGGER_CONFIG:
 configSource = 
ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG;
 break;
+case CLIENT_METRICS_CONFIG:

Review Comment:
   Sounds good, thanks for letting me know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-01 Thread via GitHub


kirktrue commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1379392210


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -729,6 +729,13 @@ protected MetadataRequest.Builder 
newMetadataRequestBuilderForNewTopics() {
 return null;
 }
 
+/**
+ * @return Mapping from topic IDs to topic names for all topics in the 
cache.
+ */
+public synchronized Map topicNames() {

Review Comment:
   Now that I'm noticing, is this a public API violation? It's not in 
`internals` and we're adding a `public` method 🤔



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -73,30 +84,55 @@ public interface MembershipManager {
 /**
  * @return Current assignment for the member.
  */
-ConsumerGroupHeartbeatResponseData.Assignment currentAssignment();
+Set currentAssignment();
 
 /**
- * Update the assignment for the member, indicating that the provided 
assignment is the new
- * current assignment.
+ * @return Assignment that the member received from the server but hasn't 
finished processing
+ * yet.
  */
-void 
onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment 
assignment);
+Optional targetAssignment();

Review Comment:
   `targetAssignment()` looks to only be used by unit tests at the moment. Does 
it make sense to remove it from the interface and leave it as a method on the 
implementation only?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -260,42 +743,16 @@ public Optional serverAssignor() {
  * {@inheritDoc}
  */
 @Override
-public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
+public Set currentAssignment() {
 return this.currentAssignment;
 }
 
 
 /**
  * @return Assignment that the member received from the server but hasn't 
completely processed
- * yet. Visible for testing.

Review Comment:
   I made a comment up above about removing `targetAssignment()` from the 
`MembershipManager` interface because it was only used for testing. Does the 
removal of this statement imply that it will be used in non-testing later?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -24,21 +24,34 @@
 public enum MemberState {
 
 /**
- * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+ * Member is not part of the group. This could be the case when it has 
never joined (no call
+ * has been made to the subscribe API), or when the member intentionally 
leaves the group
+ * after a call to the unsubscribe API.
  */
-UNJOINED,
+NOT_IN_GROUP,
+
+/**
+ * Member is attempting to join a consumer group. This could be the case 
when joining for the
+ * first time, or when it has been fenced and tries to re-join.
+ */
+JOINING,
 
 /**
  * Member has received a new target assignment (partitions could have been 
assigned or
  * revoked), and it is processing it. While in this state, the member will
  * invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and then make
  * the new assignment effective.
  */
-// TODO: determine if separate state will be needed for assign/revoke (not 
for now)
 RECONCILING,
 
 /**
- * Member is active in a group (heartbeating) and has processed all 
assignments received.
+ * Member has completed reconciling an assignment received, and stays in 
this state until the
+ * next heartbeat request is sent out to acknowledge the assignment to the 
server.

Review Comment:
   As I understand, this is saying the consumer will leave the 
`ACKNOWLEDGING_RECONCILED_ASSIGNMENT` state as soon as the next heartbeat is 
sent off, rather than the next heartbeat is received, right? What happens if 
that heartbeat request gets lost?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -24,21 +24,34 @@
 public enum MemberState {
 
 /**
- * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+ * Member is not part of the group. This could be the case when it has 
never joined (no call
+ * has been made to the subscribe API), or when the member intentionally 
leaves the group
+ * after a call to the unsubscribe API.
  */
-UNJOINED,
+NOT_IN_GROUP,

Review Comment:
   This is the case where the consumer is not in a group _presently_, right? A 
consumer without a configured group ID wouldn't get to this point, would it?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -73,30 +84,55 @@ public interface MembershipManager {
 /**
  * @return Cu

Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-01 Thread via GitHub


jolshan commented on PR #14387:
URL: https://github.com/apache/kafka/pull/14387#issuecomment-1789827363

   I took a first pass at the files today, and will take another look more 
closely at the metrics themselves tomorrow or so.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-01 Thread via GitHub


jolshan commented on code in PR #14387:
URL: https://github.com/apache/kafka/pull/14387#discussion_r1379424990


##
checkstyle/import-control.xml:
##
@@ -240,6 +240,7 @@
   
   
   
+

Review Comment:
   Are we planning not to add any of the new sensor metrics though? I assume 
that's what the description is referring to when it says we need to wait for 
other things to get the sensors.
   
   > Metrics to add after https://github.com/apache/kafka/pull/14408 is merged:
   >offset deletions sensor (OffsetDeletions); Meter(offset-deletion-rate, 
offset-deletion-count)
   Metrics to add after https://issues.apache.org/jira/browse/KAFKA-14987 is 
merged:
   >offset expired sensor (OffsetExpired); Meter(offset-expiration-rate, 
offset-expiration-count)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-11-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1379425234


##
clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java:
##
@@ -222,6 +222,7 @@ public enum ConfigSource {
 DYNAMIC_BROKER_LOGGER_CONFIG,   // dynamic broker logger config that 
is configured for a specific broker
 DYNAMIC_BROKER_CONFIG,  // dynamic broker config that is 
configured for a specific broker
 DYNAMIC_DEFAULT_BROKER_CONFIG,  // dynamic broker config that is 
configured as default for all brokers in the cluster
+DYNAMIC_CLIENT_METRICS_CONFIG,  // dynamic client metrics subscription 
config that is configured for all clients

Review Comment:
   I might be wrong but shouldn't this be needed to make `kafka-config.sh` work 
to create `subscription` for client_metrics. To support the `kafka-config.sh` 
utility we would require change in `ConfigEntry` and `ConfigSource`. I have 
this PR that uses same: https://github.com/apache/kafka/pull/14632



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-01 Thread via GitHub


jolshan commented on code in PR #14387:
URL: https://github.com/apache/kafka/pull/14387#discussion_r1379424990


##
checkstyle/import-control.xml:
##
@@ -240,6 +240,7 @@
   
   
   
+

Review Comment:
   Are we planning not to add any of the new sensor metrics though? I assume 
that's what the description is referring to when it says we need to wait for 
other things to get the sensors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]

2023-11-01 Thread via GitHub


lzyLuke commented on PR #14682:
URL: https://github.com/apache/kafka/pull/14682#issuecomment-1789818408

   > Thanks for the PR, @lzyLuke .
   > 
   > Please remove these two lines from "The following features are not fully 
implemented in KRaft mode":
   > 
   > > ```
   > > Modifying certain dynamic configurations on the standalone KRaft 
controller
   > > ```
   > > 
   > > 
   > > 
   > >   
   > > 
   > > 
   > >   
   > > 
   > > 
   > > 
   > >   
   > >* Delegation tokens
   
   I could not find "delegation tokens" related lines. Probably it has been 
deleted already?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-01 Thread via GitHub


jeffkbkim commented on code in PR #14387:
URL: https://github.com/apache/kafka/pull/14387#discussion_r1379419320


##
checkstyle/import-control.xml:
##
@@ -240,6 +240,7 @@
   
   
   
+

Review Comment:
   yeah, unfortunately we need this for the existing metrics



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-01 Thread via GitHub


jolshan commented on code in PR #14387:
URL: https://github.com/apache/kafka/pull/14387#discussion_r1379418802


##
checkstyle/import-control.xml:
##
@@ -240,6 +240,7 @@
   
   
   
+

Review Comment:
   Or do we have this just to cover the existing generic metrics?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-01 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1379416743


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Given this is separate to the requestLocal issue, does it make sense to file 
a JIRA and fix there? We can still consider it for a patch release if we think 
this is a regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-01 Thread via GitHub


jeffkbkim commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1379393118


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -537,22 +533,34 @@ public Map> 
partitionsPendingAssignment() {
 public String currentAssignmentSummary() {
 return "CurrentAssignment(memberEpoch=" + memberEpoch +
 ", previousMemberEpoch=" + previousMemberEpoch +
-", targetMemberEpoch=" + targetMemberEpoch +
 ", state=" + state +
 ", assignedPartitions=" + assignedPartitions +
-", partitionsPendingRevocation=" + partitionsPendingRevocation +
-", partitionsPendingAssignment=" + partitionsPendingAssignment +
+", revokedPartitions=" + revokedPartitions +
 ')';
 }
 
+/**
+ * @return True if the assignment of this member is equals to the 
assignment

Review Comment:
   nit: is equal to



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -511,24 +507,24 @@ public MemberState state() {
 }
 
 /**
- * @return The set of assigned partitions.
+ * @return True of the member is in the Stable state and at the desired 
epoch.

Review Comment:
   nit: True if



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
  *
- * - Partitions Pending Assignment:
- *   The set of partitions that the member will eventually receive. The 
partitions in this set are
- *   still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ *   The member has received a new assignment from the group coordinator but
+ *   he has not acknowledged it yet. The member is removed from the group if
+ *   he does not acknowledge it within the rebalance timeout.
  *
- * The state machine has three states:
- * - REVOKING:
- *   This state means that the member must revoke partitions before it can 
transition to the next epoch
- *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
- *   are committed with the current epoch. The member transitions to the next 
state only when it has
- *   acknowledged the revocation.
+ *   Valid Transitions:
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the 
STABLE
+ * state if it is fully reconciled.
  *
- * - ASSIGNING:
- *   This state means that the member waits on partitions which are still 
owned by other members in the
- *   group. It remains in this state until they are all freed up.
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT
+ * When the assignment is acknowledged, the member remains in the
+ * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed.
  *
- * - STABLE:
- *   This state means that the member has received all its assigned partitions.
+ * If the next assi

[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-11-01 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781922#comment-17781922
 ] 

Philip Nee commented on KAFKA-15602:


Hi [~luke.kirby] - You don't need to become a committer to write a KIP, apache 
committer means something different :)

 

You just need to send an email to the mailing list to request the permission 
per...
{code:java}
If this is your first time contributing:Sign up for the Developer mailing list 
d...@kafka.apache.org . The instructions to sign up are here: 
http://kafka.apache.org/contactCreate a wiki ID 
(https://cwiki.apache.org/confluence/signup.action)Create a Jira ID (It's a 
different system than the wiki) using the ASF Self-serve Portal 
(https://selfserve.apache.org/jira-account.html)Send an email to the dev 
mailing list (d...@kafka.apache.org) containing your wiki ID and Jira ID 
requesting permissions to contribute to Apache Kafka. {code}
Looking forward to seeing the KIP!

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Assignee: Matthias J. Sax
>Priority: Critical
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e

Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-01 Thread via GitHub


jolshan commented on code in PR #14387:
URL: https://github.com/apache/kafka/pull/14387#discussion_r1379411500


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorMetricsShard.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import com.yammer.metrics.core.MetricName;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * A CoordinatorMetricsShard is mapped to a single CoordinatorShard. For 
gauges, each metrics shard increments/decrements
+ * based on the operations performed. Then, {@link CoordinatorMetrics} will 
perform aggregations across all shards.
+ *
+ * For sensors, each shard individually records the observed values.
+ */
+public interface CoordinatorMetricsShard {
+/**
+ * Increment a gauge.
+ *
+ * @param metricName the metric name.
+ * @param isGlobal   whether the metric is globally shared across shards
+ */
+void increment(MetricName metricName, boolean isGlobal);

Review Comment:
   would it be more useful to have separate methods for global vs not global?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-01 Thread via GitHub


hachikuji commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1379404877


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   @junrao If the lock cannot protect the write, then the validation in the 
group coordinator cannot be relied on. In principle, it would be possible for 
an older offset commit to override a newer one for example. A more likely 
scenario might be for a group to rebalance while an offset commit is awaiting 
validation. This would allow a fenced consumer to be able to commit offsets. 
Without the lock, I think we'd have to introduce additional bookkeeping to 
track dependencies between inflight writes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2023-11-01 Thread via GitHub


CalvinConfluent commented on PR #14604:
URL: https://github.com/apache/kafka/pull/14604#issuecomment-1789790905

   Failed tests are irrelevant.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-01 Thread via GitHub


jolshan commented on code in PR #14387:
URL: https://github.com/apache/kafka/pull/14387#discussion_r1379402554


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -352,7 +374,14 @@ public CoordinatorResult 
commitOffset(
 });
 });
 
-return new CoordinatorResult<>(records, response);
+CompletableFuture appendFuture = new CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {

Review Comment:
   nit: are my eyes playing tricks on me or is the _ two __?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-01 Thread via GitHub


jolshan commented on code in PR #14387:
URL: https://github.com/apache/kafka/pull/14387#discussion_r1379398861


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -503,6 +547,7 @@ public void onLoaded(MetadataImage newImage) {
 MetadataDelta emptyDelta = new MetadataDelta(newImage);
 groupMetadataManager.onNewMetadataImage(newImage, emptyDelta);
 offsetMetadataManager.onNewMetadataImage(newImage, emptyDelta);
+coordinatorMetrics.activateMetricsShard(metricsShard);

Review Comment:
   Is there a reason we didn't want to put deactivate the shard in onUnloaded?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-01 Thread via GitHub


jolshan commented on code in PR #14387:
URL: https://github.com/apache/kafka/pull/14387#discussion_r1379398861


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -503,6 +547,7 @@ public void onLoaded(MetadataImage newImage) {
 MetadataDelta emptyDelta = new MetadataDelta(newImage);
 groupMetadataManager.onNewMetadataImage(newImage, emptyDelta);
 offsetMetadataManager.onNewMetadataImage(newImage, emptyDelta);
+coordinatorMetrics.activateMetricsShard(metricsShard);

Review Comment:
   It's probably not a concern for this PR, but it would be nice if the 
activate and deactivate could be in the same file. 
   I think any coordinator shard could have an unload method for example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-01 Thread via GitHub


jolshan commented on code in PR #14387:
URL: https://github.com/apache/kafka/pull/14387#discussion_r1379393439


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -140,7 +159,12 @@ public GroupCoordinatorShard build() {
 throw new IllegalArgumentException("Time must be set.");
 if (timer == null)
 throw new IllegalArgumentException("Timer must be set.");
+if (coordinatorMetrics == null)
+throw new IllegalArgumentException("CoordinatorMetricsShard 
must be set.");

Review Comment:
   I noticed we check the metrics shard to ensure it is a 
GroupCoordinatorMetrics shard in another file. Should we check the type sooner? 
Should we also check this is the type GroupCoordinatorMetrics (also should the 
error not include shard here)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] DO NOT MERGE: introduce internal StoreFactory [kafka]

2023-11-01 Thread via GitHub


ableegoldman commented on code in PR #14659:
URL: https://github.com/apache/kafka/pull/14659#discussion_r1379390131


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##
@@ -611,33 +545,38 @@ public final  void addProcessor(final 
String name,
 
 public final void addStateStore(final StoreBuilder storeBuilder,
 final String... processorNames) {
+addStateStore(new StoreBuilderWrapper<>(storeBuilder), false, 
processorNames);
+}
+
+public final void addStateStore(final StoreFactory storeBuilder,
+final String... processorNames) {
 addStateStore(storeBuilder, false, processorNames);
 }
 
-public final void addStateStore(final StoreBuilder storeBuilder,
+public final void addStateStore(final StoreFactory storeFactory,
 final boolean allowOverride,
 final String... processorNames) {
-Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
-final StateStoreFactory stateFactory = 
stateFactories.get(storeBuilder.name());
-if (!allowOverride && stateFactory != null && 
!stateFactory.builder.equals(storeBuilder)) {
-throw new TopologyException("A different StateStore has already 
been added with the name " + storeBuilder.name());
+Objects.requireNonNull(storeFactory, "stateStoreFactory can't be 
null");
+final StoreFactory stateFactory = 
stateFactories.get(storeFactory.name());
+if (!allowOverride && stateFactory != null && 
!stateFactory.isCompatibleWith(storeFactory)) {
+throw new TopologyException("A different StateStore has already 
been added with the name " + storeFactory.name());
 }
-if (globalStateBuilders.containsKey(storeBuilder.name())) {
-throw new TopologyException("A different GlobalStateStore has 
already been added with the name " + storeBuilder.name());
+if (globalStateBuilders.containsKey(storeFactory.name())) {
+throw new TopologyException("A different GlobalStateStore has 
already been added with the name " + storeFactory.name());
 }
 
-stateFactories.put(storeBuilder.name(), new 
StateStoreFactory<>(storeBuilder));
+stateFactories.put(storeFactory.name(), storeFactory);
 
 if (processorNames != null) {
 for (final String processorName : processorNames) {
 Objects.requireNonNull(processorName, "processor name must not 
be null");
-connectProcessorAndStateStore(processorName, 
storeBuilder.name());
+connectProcessorAndStateStore(processorName, 
storeFactory.name());
 }
 }
 nodeGroups = null;
 }
 
-public final  void addGlobalStore(final StoreBuilder 
storeBuilder,
+public final  void addGlobalStore(final StoreFactory 
storeBuilder,

Review Comment:
   very understandable -- just wanted to leave a comment in case you missed 
it/to help identify some of those two million lines 😭 



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##
@@ -611,33 +545,38 @@ public final  void addProcessor(final 
String name,
 
 public final void addStateStore(final StoreBuilder storeBuilder,
 final String... processorNames) {
+addStateStore(new StoreBuilderWrapper<>(storeBuilder), false, 
processorNames);
+}
+
+public final void addStateStore(final StoreFactory storeBuilder,
+final String... processorNames) {
 addStateStore(storeBuilder, false, processorNames);
 }
 
-public final void addStateStore(final StoreBuilder storeBuilder,
+public final void addStateStore(final StoreFactory storeFactory,
 final boolean allowOverride,
 final String... processorNames) {
-Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
-final StateStoreFactory stateFactory = 
stateFactories.get(storeBuilder.name());
-if (!allowOverride && stateFactory != null && 
!stateFactory.builder.equals(storeBuilder)) {
-throw new TopologyException("A different StateStore has already 
been added with the name " + storeBuilder.name());
+Objects.requireNonNull(storeFactory, "stateStoreFactory can't be 
null");
+final StoreFactory stateFactory = 
stateFactories.get(storeFactory.name());
+if (!allowOverride && stateFactory != null && 
!stateFactory.isCompatibleWith(storeFactory)) {
+throw new TopologyException("A different StateStore has already 
been added with the name " + storeFactory.name());
 }
-if (globalStateBuilders.containsKey(storeBuilder.name())) {
-throw new TopologyException("

Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-01 Thread via GitHub


jolshan commented on code in PR #14387:
URL: https://github.com/apache/kafka/pull/14387#discussion_r1379389203


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -121,6 +126,20 @@ public CoordinatorShardBuilder withTimer(
 return this;
 }
 
+@Override
+public CoordinatorShardBuilder 
withCoordinatorMetrics(
+CoordinatorMetrics coordinatorMetrics
+) {
+this.coordinatorMetrics = coordinatorMetrics;
+return this;
+}
+
+@Override

Review Comment:
   for my understanding, this topic partition is only used for metrics tagging?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-01 Thread via GitHub


jolshan commented on code in PR #14387:
URL: https://github.com/apache/kafka/pull/14387#discussion_r1379388116


##
checkstyle/import-control.xml:
##
@@ -240,6 +240,7 @@
   
   
   
+

Review Comment:
   I recall we removed yammer metrics from the last pr, do we also want to do 
that here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-11-01 Thread Luke Kirby (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781911#comment-17781911
 ] 

Luke Kirby commented on KAFKA-15602:


Thanks so much team! Maybe not the right place to ask this, but is there a plan 
for a 3.4.2 release? I can see that there's a 3.5.2 just waiting on some things.

On the KIP front, I am still interested in pursuing that, but it's not as 
pressing a concern. Reading through the guidance there (thanks Philip!) it 
looks like I have to request committer status first? I'll probably get some 
CCLA stuff rolling on my end to make that happen.

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Assignee: Matthias J. Sax
>Priority: Critical
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limi

[jira] [Updated] (KAFKA-15770) org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy is flaky

2023-11-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15770:

Component/s: unit tests

> org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy
>  is flaky 
> ---
>
> Key: KAFKA-15770
> URL: https://issues.apache.org/jira/browse/KAFKA-15770
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Alok Thatikunta
>Priority: Major
>
> Test fails on CI, passes locally
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14607/9/testReport/junit/org.apache.kafka.streams.integration/ConsistencyVectorIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldHaveSamePositionBoundActiveAndStandBy/]
> {code:java}
> java.lang.AssertionError: 
> Result:SucceededQueryResult{result=<0,1698511250443>, executionInfo=[], 
> position=Position{position={input-topic={0=50
> Expected: is 
>  but: was  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-01 Thread via GitHub


lianetm opened a new pull request, #14690:
URL: https://github.com/apache/kafka/pull/14690

   Updates for the Client state machine, Heartbeat and Commit managers, to 
support required states and transitions as the member interacts with a consumer 
group, allowing to commit offsets, and get assignment that the member 
reconciles. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15627: KIP-951's Leader discovery optimisations on the client [kafka]

2023-11-01 Thread via GitHub


wcarlson5 merged PR #14685:
URL: https://github.com/apache/kafka/pull/14685


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15627: KIP-951's Leader discovery optimisations on the client [kafka]

2023-11-01 Thread via GitHub


wcarlson5 commented on PR #14685:
URL: https://github.com/apache/kafka/pull/14685#issuecomment-1789576997

   I'm satisfied with comments and discussion on old PR 
https://github.com/apache/kafka/pull/14564 so I'm going to go a head and merge 
this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15627: KIP-951's Leader discovery optimisations on the client [kafka]

2023-11-01 Thread via GitHub


wcarlson5 commented on code in PR #14685:
URL: https://github.com/apache/kafka/pull/14685#discussion_r1379261920


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
 log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
 }
 
+/**
+ * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+ * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+ * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+ * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+ * @param partitionLeaders map of new leadership information for 
partitions.
+ * @param leaderNodes a list of nodes for leaders in the above map.
+ * @return a set of partitions, for which leaders were updated.
+ */
+public synchronized Set 
updatePartially(Map partitionLeaders, 
List leaderNodes) {
+Map newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+// Insert non-overlapping nodes from existing-nodes into new-nodes.
+this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+// Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+// 1. for which the corresponding partition has newer leader in 
existing metadata.
+// 2. for which corresponding leader's node is missing in the 
new-nodes.
+// 3. for which the existing metadata doesn't know about the partition.
+List updatePartitionMetadata = new ArrayList<>();
+Set updatedPartitions = new HashSet<>();
+for (Entry partitionLeader: partitionLeaders.entrySet()) {
+TopicPartition partition = (TopicPartition) 
partitionLeader.getKey();
+Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) 
partitionLeader.getValue();
+if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+log.trace("For {}, incoming leader information is incomplete 
{}", partition, newLeader);
+continue;
+}
+if (currentLeader.epoch.isPresent() && newLeader.epoch.get() <= 
currentLeader.epoch.get()) {
+log.trace("For {}, incoming leader({}) is not-newer than the 
one in the existing metadata {}, so ignoring.", partition, newLeader, 
currentLeader);
+continue;
+}
+if (!newNodes.containsKey(newLeader.leaderId.get())) {
+log.trace("For {}, incoming leader({}), the corresponding node 
information for node-id {} is missing, so ignoring.", partition, newLeader, 
newLeader.leaderId.get());
+continue;
+}
+if (!this.cache.partitionMetadata(partition).isPresent()) {
+log.trace("For {}, incoming leader({}), no longer has cached 
metadata so ignoring.", partition, newLeader);
+continue;
+}
+
+MetadataResponse.PartitionMetadata existingMetadata = 
this.cache.partitionMetadata(partition).get();
+MetadataResponse.PartitionMetadata updatedMetadata = new 
MetadataResponse.PartitionMetadata(
+existingMetadata.error,
+partition,
+newLeader.leaderId,
+newLeader.epoch,
+existingMetadata.replicaIds,
+existingMetadata.inSyncReplicaIds,
+existingMetadata.offlineReplicaIds
+);
+updatePartitionMetadata.add(updatedMetadata);
+
+lastSeenLeaderEpochs.put(partition, newLeader.epoch.get());
+
+updatedPartitions.add(partition);
+}
+
+if (updatedPartitions.isEmpty()) {
+log.debug("No relevant metadata updates.");
+return updatedPartitions;
+}
+
+// Get topic-ids for filtered partitions from existing topic-ids.
+Map existingTopicIds = this.cache.topicIds();
+Map filteredTopicIds = updatePartitionMetadata.stream()
+.filter(e -> existingTopicIds.containsKey(e.topic()))
+.collect(Collectors.toMap(e -> e.topic(), e -> 
existingTopicIds.get(e.topic(;
+
+Set updatedTopics = 
updatePartitionMetadata.stream().map(MetadataResponse.PartitionMetadata::topic).collect(Collectors.toSet());
+
+if (log.isTraceEnabled()) {
+updatePartitionMetadata.forEach(
+partMetadata -> log.trace("For {} updating to leader-id {}, 
leader-epoch {}.", partMetadata, partMetadata.leaderId.get(), 
partMetadata.leaderEpoch.get(

Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-11-01 Thread via GitHub


philipnee commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1379231747


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT);
+public static final String GROUP_PROTOCOL_DOC = "The group protocol 
consumer should use. We currently " +
+"support \"generic\" or \"consumer\". If \"consumer\" is specified, 
then the consumer group protocol will be " +
+"used. Otherwise, the generic group protocol will be used.";
+
+/**
+* group.remote.assignor
+*/
+public static final String REMOTE_ASSIGNOR_CONFIG = 
"group.remote.assignor";
+public static final String DEFAULT_REMOTE_ASSIGNOR = null;
+public static final String REMOTE_ASSIGNOR_DOC = "The server-side assignor 
to use. If no assignor is specified, " +

Review Comment:
   To make it more consistent: DEFAULT_GROUP_REMOTE_ASSIGNOR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]

2023-11-01 Thread via GitHub


cmccabe commented on PR #14682:
URL: https://github.com/apache/kafka/pull/14682#issuecomment-1789520190

   Thanks for the PR, @lzyLuke .
   
   Please remove these two lines from "The following features are not fully 
implemented in KRaft mode":
   > Modifying certain dynamic configurations on the standalone KRaft 
controller
   >Delegation tokens


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]

2023-11-01 Thread via GitHub


cmccabe commented on code in PR #14682:
URL: https://github.com/apache/kafka/pull/14682#discussion_r1379209724


##
docs/ops.html:
##
@@ -3613,23 +3613,22 @@ https://issues.apache.org/jira/projects/KAFKA"; 
target="_blank">project JIRA and the "kraft" component.
   
 
-  
-
   Terminology
   
-Brokers that are in ZK mode store their metadata in Apache 
ZooKeeper. This is the old mode of handling metadata.
+Brokers that are in ZK mode store their metadata in Apache 
ZooKepeer. This is the old mode of handling metadata.
 Brokers that are in KRaft mode store their metadata in a KRaft 
quorum. This is the new and improved mode of handling metadata.
 Migration is the process of moving cluster metadata from 
ZooKeeper into a KRaft quorum.
   
 
   Migration Phases
   In general, the migration process passes through several phases.
+
+  For
   
-In the initial phase, all the brokers are in ZK mode, and there 
is a ZK-based controller.
-During the initial metadata load, a KRaft quorum loads the 
metadata from ZooKeeper,
-In hybrid phase, some brokers are in ZK mode, but there is a 
KRaft controller.
-In dual-write phase, all brokers are KRaft, but the KRaft 
controller is continuing to write to ZK.
-When the migration has been finalized, we no longer write 
metadata to ZooKeeper.
+In the pre-migration phase, all the brokers are in ZK mode, and 
there is a ZK-based controller. But a KRaft controller has been provisioned and 
has migration enabled

Review Comment:
   Change "controller" to "controller quorum"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]

2023-11-01 Thread via GitHub


cmccabe commented on code in PR #14682:
URL: https://github.com/apache/kafka/pull/14682#discussion_r1379209314


##
docs/ops.html:
##
@@ -3613,23 +3613,22 @@ https://issues.apache.org/jira/projects/KAFKA"; 
target="_blank">project JIRA and the "kraft" component.
   
 
-  
-
   Terminology
   
-Brokers that are in ZK mode store their metadata in Apache 
ZooKeeper. This is the old mode of handling metadata.
+Brokers that are in ZK mode store their metadata in Apache 
ZooKepeer. This is the old mode of handling metadata.

Review Comment:
   This adds a typo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-11-01 Thread via GitHub


AndrewJSchofield commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1379170868


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT);
+public static final String GROUP_PROTOCOL_DOC = "The group protocol 
consumer should use. We currently " +
+"support \"generic\" or \"consumer\". If \"consumer\" is specified, 
then the consumer group protocol will be " +
+"used. Otherwise, the generic group protocol will be used.";
+
+/**
+* group.remote.assignor
+*/
+public static final String REMOTE_ASSIGNOR_CONFIG = 
"group.remote.assignor";

Review Comment:
   `GROUP_REMOTE_ASSIGNOR_CONFIG` would be better.



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT);
+public static final String GROUP_PROTOCOL_DOC = "The group protocol 
consumer should use. We currently " +
+"support \"generic\" or \"consumer\". If \"consumer\" is specified, 
then the consumer group protocol will be " +
+"used. Otherwise, the generic group protocol will be used.";
+
+/**
+* group.remote.assignor
+*/
+public static final String REMOTE_ASSIGNOR_CONFIG = 
"group.remote.assignor";
+public static final String DEFAULT_REMOTE_ASSIGNOR = null;
+public static final String REMOTE_ASSIGNOR_DOC = "The server-side assignor 
to use. If no assignor is specified, " +

Review Comment:
   `GROUP_REMOTE_ASSIGNOR_DOC`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]

2023-11-01 Thread via GitHub


lzyLuke commented on code in PR #14682:
URL: https://github.com/apache/kafka/pull/14682#discussion_r1379152694


##
docs/ops.html:
##
@@ -3787,7 +3786,7 @@ Migrating brokers to KRaft
   Reverting to ZooKeeper mode During the Migration
 While the cluster is still in migration mode, it is possible to revert to 
ZK mode. In order to do this:
 
-  One by one, take each KRaft broker down. Remove the 
__cluster_metadata directory on the broker. Then, restart the broker as 
ZooKeeper.
+  One by one, take each KRaft broker down. Remove the 
__cluster_metadata directory on the broker. Then, restart the broker as 
zookeeper mode.

Review Comment:
   maybe `in` would be better :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >