Re: [PR] [KAFKA-9965] Fix accumulator tryAppend, so that fresh new producerBatch is created [kafka]
blue-int commented on PR #12462: URL: https://github.com/apache/kafka/pull/12462#issuecomment-1790149877 I want RoundRobinPartitioner to work again. When might this PR be merged and released? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments
[ https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15682: - Description: One of the implementation of RemoteLogMetadataManager is TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic {{__remote_log_metadata}} to store the metadata about the remote log segments. Unlike other internal topics which are compaction enabled, this topic is not enabled with compaction and retention is set to unlimited. Keeping this internal topic retention to unlimited is not practical in real world use-case where the topic local disk usage footprint grow huge over a period of time. It is assumed that the user will set the retention to a reasonable time such that it is the max of all the user-created topics (max + X). We can't just rely on the assumption and need an assertion to ensure that the internal {{__remote_log_metadata}} segments are not eligible for deletion before the expiry of all the relevant user-topic uploaded remote-log-segments , otherwise there will be dangling remote-log-segments which won't be cleared once all the brokers are restarted post the internal topic retention cleanup. See the discussion thread: https://github.com/apache/kafka/pull/14576#discussion_r1368576126 was: One of the implementation of RemoteLogMetadataManager is TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic {{__remote_log_metadata}} to store the metadata about the remote log segments. Unlike other internal topics which are compaction enabled, this topic is not enabled with compaction and retention is set to unlimited. Keeping this internal topic retention to unlimited is not practical in real world use-case where the topic local disk usage footprint grow huge over a period of time. It is assumed that the user will set the retention to a reasonable time such that it is the max of all the user-created topics (max + X). We can't just rely on the assumption and need an assertion to ensure that the internal {{__remote_log_metadata}} segments are not eligible for deletion before the expiry of all the relevant user-topic uploaded remote-log-segments , otherwise there will be dangling remote-log-segments which won't be cleared once all the brokers are restarted post the internal topic retention cleanup. > Ensure internal remote log metadata topic does not expire its segments before > deleting user-topic segments > -- > > Key: KAFKA-15682 > URL: https://issues.apache.org/jira/browse/KAFKA-15682 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Priority: Major > > One of the implementation of RemoteLogMetadataManager is > TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic > {{__remote_log_metadata}} to store the metadata about the remote log > segments. Unlike other internal topics which are compaction enabled, this > topic is not enabled with compaction and retention is set to unlimited. > Keeping this internal topic retention to unlimited is not practical in real > world use-case where the topic local disk usage footprint grow huge over a > period of time. > It is assumed that the user will set the retention to a reasonable time such > that it is the max of all the user-created topics (max + X). We can't just > rely on the assumption and need an assertion to ensure that the internal > {{__remote_log_metadata}} segments are not eligible for deletion before the > expiry of all the relevant user-topic uploaded remote-log-segments , > otherwise there will be dangling remote-log-segments which won't be cleared > once all the brokers are restarted post the internal topic retention cleanup. > See the discussion thread: > https://github.com/apache/kafka/pull/14576#discussion_r1368576126 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Enable kraft test in kafka.api and kafka.network [kafka]
dengziming commented on PR #14595: URL: https://github.com/apache/kafka/pull/14595#issuecomment-1790107126 Hello @cmccabe @jsancio , no flaky is found in the latest 4 builds, please take a look。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Enable kraft test in kafka.api and kafka.network [kafka]
dengziming commented on PR #14595: URL: https://github.com/apache/kafka/pull/14595#issuecomment-1790105902 I triggered the CI 5 times, only `ConsumerBounceTest` is flaky and all other tests succeed, I have reverted `ConsumerBounceTest` and convert it in the future. https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//#showFailuresLink ``` [Build / JDK 17 and Scala 2.13 / kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_17_and_Scala_2_13___testConsumptionWithBrokerFailures_String__quorum_kraft/) [Build / JDK 17 and Scala 2.13 / kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_17_and_Scala_2_13___testConsumptionWithBrokerFailures_String__quorum_kraft_2/) [Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_11_and_Scala_2_13___testConsumptionWithBrokerFailures_String__quorum_kraft/) [Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_11_and_Scala_2_13___testConsumptionWithBrokerFailures_String__quorum_kraft_2/) [Build / JDK 8 and Scala 2.12 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_8_and_Scala_2_12___testRackAwareRangeAssignor_String__quorum_kraft/) [Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_8_and_Scala_2_12___testConsumptionWithBrokerFailures_String__quorum_zk/) [Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_8_and_Scala_2_12___testConsumptionWithBrokerFailures_String__quorum_kraft/) [Build / JDK 21 and Scala 2.13 / kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_21_and_Scala_2_13___testConsumptionWithBrokerFailures_String__quorum_kraft/) [Build / JDK 21 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_21_and_Scala_2_13___testTrustStoreAlter_String__quorum_kraft/) ``` https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/1//#showFailuresLink ``` [Build / JDK 21 and Scala 2.13 / kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/2//testReport/junit/kafka.server/DescribeClusterRequestTest/Build___JDK_21_and_Scala_2_13___testDescribeClusterRequestIncludingClusterAuthorizedOperations_String__quorum_kraft/) [Build / JDK 21 and Scala 2.13 / kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/2//testReport/junit/kafka.server/DescribeClusterRequestTest/Build___JDK_21_and_Scala_2_13___testDescribeClusterRequestIncludingClusterAuthorizedOperations_String__quorum_kraft_2/) ``` https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/3//#showFailuresLink ``` [Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/3//testReport/junit/kafka.api/TransactionsTest/Build___JDK_11_and_Scala_2_13___testBumpTransactionalEpoch_String__quorum_kraft/) ``` https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14595/4//#showFailuresLink ``` [Build / JDK 8 and Scala 2.12 / kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestExcludingClusterAuthorizedOperations(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/ka
[jira] [Commented] (KAFKA-15536) dynamically resize remoteIndexCache
[ https://issues.apache.org/jira/browse/KAFKA-15536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781999#comment-17781999 ] Henry Cai commented on KAFKA-15536: --- Can this be back ported to 3.6 branch? > dynamically resize remoteIndexCache > --- > > Key: KAFKA-15536 > URL: https://issues.apache.org/jira/browse/KAFKA-15536 > Project: Kafka > Issue Type: Improvement > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Luke Chen >Assignee: hudeqi >Priority: Major > Fix For: 3.7.0 > > > context: > https://github.com/apache/kafka/pull/14243#discussion_r1320630057 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15632) Drop the invalid remote log metadata events
[ https://issues.apache.org/jira/browse/KAFKA-15632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781998#comment-17781998 ] Henry Cai commented on KAFKA-15632: --- Can this fix be back ported to 3.6 branch? [~showuon] [~ckamal] > Drop the invalid remote log metadata events > > > Key: KAFKA-15632 > URL: https://issues.apache.org/jira/browse/KAFKA-15632 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > > {{__remote_log_metadata}} topic cleanup policy is set to {{DELETE}} and > default retention is set to unlimited. > The expectation is that the user will configure the maximum retention time > for this internal topic compared to all the other user created topics in the > cluster. We cannot keep it to unlimited as the contents of this internal > topic need to be in the local storage. > RemoteLogMetadata cache expect the events to be in the order of > [RemoteLogSegmentState#isValidTransition|https://github.com/apache/kafka/blob/trunk/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java#L93] > Once the retention got expired for this topic say after 30 days due to breach > by size/time, then there can be partial metadata events and the > [cache|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java#L160] > start to throw RemoteResourceNotFoundError. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781751#comment-17781751 ] Arpit Goyal edited comment on KAFKA-15388 at 11/2/23 5:04 AM: -- [~divijvaidya] I am not getting the archival functionality , From archival you mean once the segments has been expired because of retention mechanism ? Do you mean If we takes care of the endoffset value while reading it from the remote storage i.e. read offset from the next segment base offset instead of manipulating using endoffset. I found two usages where we are using it 1. One while updating the logStartOffset during cleanup of the log segment based on retention https://github.com/apache/kafka/blob/5785796f985aa294c12e670da221d086a7fa887c/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L827 2. During read path of the remote storage https://github.com/apache/kafka/blob/5785796f985aa294c12e670da221d086a7fa887c/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1263 We need to correct at both places ? was (Author: JIRAUSER301926): [~divijvaidya] I am not getting the archival functionality , From archival you mean copying segments would not be impacted. Do you mean If we takes care of the endoffset value while reading it from the remote storage i.e. read offset from the next segment base offset instead of manipulating using endoffset. I found two usages where we are using it 1. One while updating the logStartOffset during cleanup of the log segment based on retention https://github.com/apache/kafka/blob/5785796f985aa294c12e670da221d086a7fa887c/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L827 2. During read path of the remote storage https://github.com/apache/kafka/blob/5785796f985aa294c12e670da221d086a7fa887c/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1263 We need to correct at both places ? > Handle topics that were having compaction as retention earlier are changed to > delete only retention policy and onboarded to tiered storage. > > > Key: KAFKA-15388 > URL: https://issues.apache.org/jira/browse/KAFKA-15388 > Project: Kafka > Issue Type: Bug >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Blocker > Fix For: 3.7.0 > > > Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517] > > There are 3 paths I looked at: > * When data is moved to remote storage (1) > * When data is read from remote storage (2) > * When data is deleted from remote storage (3) > (1) Does not have a problem with compacted topics. Compacted segments are > uploaded and their metadata claims they contain offset from the baseOffset of > the segment until the next segment's baseOffset. There are no gaps in offsets. > (2) Does not have a problem if a customer is querying offsets which do not > exist within a segment, but there are offset after the queried offset within > the same segment. *However, it does have a problem when the next available > offset is in a subsequent segment.* > (3) For data deleted via DeleteRecords there is no problem. For data deleted > via retention there is no problem. > > *I believe the proper solution to (2) is to make tiered storage continue > looking for the next greater offset in subsequent segments.* > Steps to reproduce the issue: > {code:java} > // TODO (christo) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
mjsax commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1379595660 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java: ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.AggregationTemporality; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; + +import java.time.Instant; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * This class represents a telemetry metric that does not yet contain resource tags. + * These additional resource tags will be added before emitting metrics by the telemetry reporter. + */ +public class SinglePointMetric implements MetricKeyable { + +private final MetricKey key; +private final Metric.Builder metricBuilder; + +private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) { +this.key = key; +this.metricBuilder = metricBuilder; +} + +@Override +public MetricKey key() { +return key; +} + +public Metric.Builder metric() { +return metricBuilder; +} + +public static SinglePointMetric create(MetricKey metricKey, Metric.Builder metric) { +return new SinglePointMetric(metricKey, metric); +} + +/* +Methods to construct gauge metric type. + */ +public static SinglePointMetric gauge(MetricKey metricKey, Number value, Instant timestamp) { +NumberDataPoint.Builder point = point(timestamp, value); +return gauge(metricKey, point); +} + +public static SinglePointMetric gauge(MetricKey metricKey, double value, Instant timestamp) { +NumberDataPoint.Builder point = point(timestamp, value); +return gauge(metricKey, point); +} + +/* +Methods to construct sum metric type. + */ + +public static SinglePointMetric sum(MetricKey metricKey, double value, boolean monotonic, Instant timestamp) { +return sum(metricKey, value, monotonic, timestamp, null); +} + +public static SinglePointMetric sum(MetricKey metricKey, double value, boolean monotonic, Instant timestamp, +Instant startTimestamp) { +NumberDataPoint.Builder point = point(timestamp, value); +if (startTimestamp != null) { +point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp)); +} + +return sum(metricKey, AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point); +} + +public static SinglePointMetric deltaSum(MetricKey metricKey, double value, boolean monotonic, +Instant timestamp, Instant startTimestamp) { +NumberDataPoint.Builder point = point(timestamp, value) Review Comment: Agree to Apoorv. We should have check for anything "public" but not necessarily _everywhere_. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
mjsax commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1379594061 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java: ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A LastValueTracker uses a ConcurrentMap to maintain historic values for a given key, and return + * a previous value and an Instant for that value. + * + * @param The type of the value. + */ +public class LastValueTracker { +private final ConcurrentMap>> counters = new ConcurrentHashMap<>(); + +/** + * Return the last instant/value for the given MetricKey, or Optional.empty if there isn't one. + * + * @param metricKey the key for which to calculate a getAndSet. + * @param now the timestamp for the new value. + * @param value the current value. + * @return the timestamp of the previous entry and its value. If there + * isn't a previous entry, then this method returns {@link Optional#empty()} + */ +public Optional> getAndSet(MetricKey metricKey, Instant now, T value) { +InstantAndValue instantAndValue = new InstantAndValue<>(now, value); +AtomicReference> valueOrNull = counters Review Comment: Still not sure if I can follow -- I don't see any code that would return the value of the `counters` map (only the `remove` but given that the map is thread safe, it's not possible that we would return it to two threads -- a second thread would get a `null` back)? And the put is protected by the concurrent map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
mjsax commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1379594061 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java: ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A LastValueTracker uses a ConcurrentMap to maintain historic values for a given key, and return + * a previous value and an Instant for that value. + * + * @param The type of the value. + */ +public class LastValueTracker { +private final ConcurrentMap>> counters = new ConcurrentHashMap<>(); + +/** + * Return the last instant/value for the given MetricKey, or Optional.empty if there isn't one. + * + * @param metricKey the key for which to calculate a getAndSet. + * @param now the timestamp for the new value. + * @param value the current value. + * @return the timestamp of the previous entry and its value. If there + * isn't a previous entry, then this method returns {@link Optional#empty()} + */ +public Optional> getAndSet(MetricKey metricKey, Instant now, T value) { +InstantAndValue instantAndValue = new InstantAndValue<>(now, value); +AtomicReference> valueOrNull = counters Review Comment: Still not sure if I can follow -- I don't see any code that would return the value of the `counters` map? And the put is protected by the concurrent map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
mjsax commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1374059672 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java: ## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Frequencies; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * All metrics implement the {@link MetricValueProvider} interface. They are divided into + * two base types: + * + * + * {@link Gauge} + * {@link Measurable} + * + * + * {@link Gauge Gauges} can have any value but we only collect metrics with number values. + * {@link Measurable Measurables} are divided into simple types with single values + * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate}, + * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies}, + * {@link Meter}, and {@link Percentiles}). + * + * + * + * We can safely assume that a {@link CumulativeCount count} always increases in steady state. It + * should be a bug if a count metric decreases. + * + * + * + * Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type + * say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in + * the codebase seem to be cumulative metrics that will always increase. The Total metric underlying + * Meter type is mostly a Total of a Count metric. + * We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both + * negative or positive). + * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example. + * + * + * + * The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE. + * + * + * + * The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types + * which are reported. A compound metric is never reported as-is. + * + * + * + * A Meter metric is always created with and reported as 2 KafkaExporter metrics: a rate and a + * count. For eg: org.apache.kafka.common.network.Selector has Meter metric for "connection-close" but it + * has to be created with a "connection-close-rate" metric of type rate and a "connection-close-total" + * metric of type total. So, we will never get a KafkaExporter metric with type Meter. + * + * + * + * Frequencies is created with a array of Frequency objects. When a Frequencies metric is registered, each + * member Frequency object is converted into an anonymous Measurable and registered. So, a Frequencies metric + * is reported with a set of measurables with name = Frequency.name(). As there is no way to figure out the + * compound type, each component measurables is converted to a GAUGE_DOUBLE. + *
[jira] [Resolved] (KAFKA-15669) Implement telemetry naming strategy
[ https://issues.apache.org/jira/browse/KAFKA-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-15669. - Fix Version/s: 3.7.0 Resolution: Fixed > Implement telemetry naming strategy > --- > > Key: KAFKA-15669 > URL: https://issues.apache.org/jira/browse/KAFKA-15669 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > Fix For: 3.7.0 > > > Define classes and implement telemetry metrics naming strategy for the > KIP-714 as defined here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat] > > The naming strategy must also support delta temporality metrics with a suffix > in original metric name. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]
mjsax commented on PR #14618: URL: https://github.com/apache/kafka/pull/14618#issuecomment-1790065635 Thanks for all the review help @xvrl! Merged to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15668) Add Opentelemetry Proto library with shadowed classes
[ https://issues.apache.org/jira/browse/KAFKA-15668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-15668. - Fix Version/s: 3.7.0 Resolution: Fixed > Add Opentelemetry Proto library with shadowed classes > - > > Key: KAFKA-15668 > URL: https://issues.apache.org/jira/browse/KAFKA-15668 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > Fix For: 3.7.0 > > > The KIP-714 requires addition of [Java client > dependency|https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Javaclientdependencies] > of {{{}opentelemetry-proto{}}}, also brings transitive dependency of > {{protobuf-java.}} The dependencies should be shadowed to avoid JVM > versioning conflicts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]
mjsax merged PR #14618: URL: https://github.com/apache/kafka/pull/14618 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]
mjsax commented on code in PR #14618: URL: https://github.com/apache/kafka/pull/14618#discussion_r1379585726 ## build.gradle: ## @@ -295,7 +296,12 @@ subprojects { } publications { mavenJava(MavenPublication) { - from components.java + if (!shouldPublishWithShadow) { +from components.java + } else { +apply plugin: 'com.github.johnrengelman.shadow' Review Comment: Thanks. Yes, I understood already what the plugin does. > This dependency is not introduced in this PR rather being already used in Kafka for a while: This was my concern, but if it's already established my concerns are void. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15614: [Minor] Renaming getter and correcting code comments [kafka]
mjsax commented on PR #14684: URL: https://github.com/apache/kafka/pull/14684#issuecomment-1790059069 Thanks for the follow up. Merged to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15614: [Minor] Renaming getter and correcting code comments [kafka]
mjsax merged PR #14684: URL: https://github.com/apache/kafka/pull/14684 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14957: Update-Description-String [kafka]
mjsax commented on PR #13909: URL: https://github.com/apache/kafka/pull/13909#issuecomment-1790049927 Seems I missed you last comment -- sorry about this. The bot comment just surfaced to me. > May I ask how do we want to change the default value? I mean, if we are to provide something custom instead of using the default value. What would be that custom thingy? Note, we only want a way to overwrite what we put into the docs, when generating them. Right now we call `config(configName)` and it give us the default value (which might depend on the machine we are running on). For the html generation part, we want something like `html(configName)` which would be something like: ``` if (htmlOverwrite(configName) != null) { return htmlOverwrite(configName); } else { return config(configName); } ``` For most configs not html overwrite existing, but for `state.dir` config we would set it to somethin meaningful that makes sense in the docs, and we need some code to actually add a optional html overwrite when we define a config: ``` CONFIG = new ConfigDef() .define(STATE_DIR_CONFIG, Type.STRING, System.getProperty("java.io.tmpdir") + File.separator + "kafka-streams", "${java.io.tmpdir}", -- this is the new line to pass in html overwrite Importance.HIGH, STATE_DIR_DOC) ``` Does this help? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KIP-909 [kafka]
philipnee opened a new pull request, #14691: URL: https://github.com/apache/kafka/pull/14691 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15765) Remove task level metric "commit-latency"
[ https://issues.apache.org/jira/browse/KAFKA-15765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-15765: --- Assignee: Bruno Cadonna > Remove task level metric "commit-latency" > - > > Key: KAFKA-15765 > URL: https://issues.apache.org/jira/browse/KAFKA-15765 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Matthias J. Sax >Assignee: Bruno Cadonna >Priority: Blocker > Fix For: 4.0.0 > > > We stopped tracking this metric with KIP-447, but kept it for backward > compatibility reasons. It's time to remove it completely with 4.0 release. > Cf > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics] > And [https://github.com/apache/kafka/pull/8218/files#r390712211] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]
mjsax commented on code in PR #14681: URL: https://github.com/apache/kafka/pull/14681#discussion_r1379562125 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: > protected is public API for classes where inheritance is allowed. While technically correct, I don't think that `ProducerConfig` is a class that is _intended_ to be extended (and we cannot make it final because we want to extend it internally). -- As a matter of fact, `ProducerConfig` should not even be instantiated by a user, but it's only public to give access to the config definitions which are all `public static final String ...` -- all other methods it has (`configName()` etc) are also just public for practical reason and should actually be hidden from the user -- it's a very leaky abstraction from an API POV -- we actually have a similar issue for `StreamsConfig`; even worse for this case; for which I consider doing a KIP to clean it up: https://github.com/apache/kafka/pull/14548) > Pointing to a commit where you did something doesn't quite provide evidence that it's following the process. ;) Sure, but I wanted to point out that many people did consider this ok in the past, so why do we think it was not ok...? If you are not going to revert this change if we merge it, I would just move forward with this PR as-is, not doing a KIP (as it's highly questionable)... If you insist on a KIP, Sophie please just write one and skip the discussion so we can VOTE it directly -- but it seems totally unnecessary work to me) And if you know me, I am _always_ in favor of doing things the right way if there is the slightest reason for it -- but for this case, I cannot see any reason for a KIP at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]
mjsax commented on code in PR #14681: URL: https://github.com/apache/kafka/pull/14681#discussion_r1379562125 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: > protected is public API for classes where inheritance is allowed. While technically correct, I don't think that `ProducerConfig` is a class that is _intended_ to be extended (and we cannot make it final because we want to extend it internally). -- As a matter of fact, `ProducerConfig` should not even be instantiated by a user, but it's only public to give access to the config definitions which are all `public static final String ...` -- all other methods it has (`configName()` etc) are also just public for practical reason and should actually be hidden from the user -- it's a very leaky abstraction from an API POV -- we actually have a similar issue for `StreamsConfig`; even worse for this case; for which I consider doing a KIP to clean it up: https://github.com/apache/kafka/pull/14548) > Pointing to a commit where you did something doesn't quite provide evidence that it's following the process. ;) Sure, but I wanted to point out that many people did consider this ok in the past, so why do we think it was not ok...? If you are not going to revert this change if we merge it, I would just move forward with this PR as-is, not doing a KIP (as it's highly questionable)... If you insist on a KIP, Sophie please just write one and skip the discussion so we can VOTE it directly -- but it seems totally unnecessary work to me) And if you know me, I am _always_ in favor of doing thing the right way if there is the slightest reason for it -- but for this case, I cannot see any reason for a KIP at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]
mjsax commented on code in PR #14681: URL: https://github.com/apache/kafka/pull/14681#discussion_r1379562125 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: > protected is public API for classes where inheritance is allowed. While technically correct, I don't think that `ProducerConfig` is a class that is _intended_ to be extended (and we cannot make it final because we want to extend it internally). -- As a matter of fact, `ProducerConfig` should not even be instantiated by a user, but it's only public to give access to the config definitions which are all `public static final String ...` -- all other methods it has (`configName()` etc) are also just public for practical reason and should actually be hidden from the user -- it's a very leaky abstraction from an API POV -- we actually have a similar issue for `StreamsConfig`; even worse for this case; for which I consider doing a KIP to clean it up: https://github.com/apache/kafka/pull/14548) > Pointing to a commit where you did something doesn't quite provide evidence that it's following the process. ;) Sure, but I wanted to point out that many people did consider this ok in the past, so why do we think it was not ok...? If you are not going to revert this change if we merge it, I would just move forward with this PR as-is, not doing a KIP (as it's highly questionable)... If you insist on a KIP, Sophie please just write one and skip the discussion so we can VOTE it directly -- but it seems totally unnecessary work to me) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
mjsax commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1379530923 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ## @@ -127,14 +131,31 @@ public QueryResult query( final QueryConfig config) { + final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; -final QueryResult result = store.query(query, positionBound, config); +QueryResult result = store.query(query, positionBound, config); +final Position position = result.getPosition(); + +if (result.isSuccess()) { +if (result.getResult() instanceof byte[]) { +final byte[] plainValue = (byte[]) result.getResult(); +final byte[] valueWithTimestamp = convertToTimestampedFormat(plainValue); +result = (QueryResult) QueryResult.forResult(valueWithTimestamp); Review Comment: Should we use `InternalQueryResultUtil.copyAndSubstituteDeserializedResult` instead, to create a copy of the result correctly? ## streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyQuery.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.util.Objects; + +/** + * Interactive query for retrieving a single record based on its key. + */ +@Evolving +public final class TimestampedKeyQuery implements Query> { + +private final K key; +private final boolean skipCache; + +private TimestampedKeyQuery(final K key, final boolean skipCache) { +this.key = Objects.requireNonNull(key); Review Comment: I think this check should go into `withKey()` -- we should also add an error message (Might be good to clean this up, inside `KeyQuery`, too) ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ## @@ -180,5 +201,38 @@ public , P> KeyValueIterator prefixScan( public long approximateNumEntries() { return store.approximateNumEntries(); } +} +class WrappedRocksDbIterator extends AbstractIterator implements ManagedKeyValueIterator { Review Comment: Wondering if we should only `implements ManagedKeyValueIterator` 🤔 -- what do we gain/need from `AbstractIterator`? We should for sure set the correct types: ``` ... extends AbstractIterator> implements ManagedKeyValueIterator ``` to fix the `Object` return type on some methods. ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ## @@ -180,5 +201,38 @@ public , P> KeyValueIterator prefixScan( public long approximateNumEntries() { return store.approximateNumEntries(); } +} +class WrappedRocksDbIterator extends AbstractIterator implements ManagedKeyValueIterator { Review Comment: Wondering if we should only `implements ManagedKeyValueIterator` 🤔 -- what do we gain/need from `AbstractIterator`? We should for sure set the correct types: ``` ... extends AbstractIterator> implements ManagedKeyValueIterator ``` to fix the `Object` return type on some methods. ## streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyQuery.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, +
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1379554731 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ## @@ -127,14 +131,31 @@ public QueryResult query( final QueryConfig config) { + final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; -final QueryResult result = store.query(query, positionBound, config); +QueryResult result = store.query(query, positionBound, config); +final Position position = result.getPosition(); + +if (result.isSuccess()) { +if (result.getResult() instanceof byte[]) { +final byte[] plainValue = (byte[]) result.getResult(); +final byte[] valueWithTimestamp = convertToTimestampedFormat(plainValue); +result = (QueryResult) QueryResult.forResult(valueWithTimestamp); +} +if (result.getResult() instanceof RocksDbIterator) { +final WrappedRocksDbIterator wrappedRocksDBRangeIterator = new WrappedRocksDbIterator((RocksDbIterator) result.getResult()); +result = (QueryResult) QueryResult.forResult(wrappedRocksDBRangeIterator); +} Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1379550244 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ## @@ -127,14 +131,31 @@ public QueryResult query( final QueryConfig config) { + final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; -final QueryResult result = store.query(query, positionBound, config); +QueryResult result = store.query(query, positionBound, config); +final Position position = result.getPosition(); + +if (result.isSuccess()) { +if (result.getResult() instanceof byte[]) { +final byte[] plainValue = (byte[]) result.getResult(); +final byte[] valueWithTimestamp = convertToTimestampedFormat(plainValue); +result = (QueryResult) QueryResult.forResult(valueWithTimestamp); +} +if (result.getResult() instanceof RocksDbIterator) { Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1379550064 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ## @@ -127,14 +131,31 @@ public QueryResult query( final QueryConfig config) { + final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; -final QueryResult result = store.query(query, positionBound, config); +QueryResult result = store.query(query, positionBound, config); +final Position position = result.getPosition(); + +if (result.isSuccess()) { +if (result.getResult() instanceof byte[]) { Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]
ijuma commented on code in PR #14681: URL: https://github.com/apache/kafka/pull/14681#discussion_r1379548300 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: Private methods are not part of the API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
philipnee commented on PR #14639: URL: https://github.com/apache/kafka/pull/14639#issuecomment-1789989071 Test failures seem irrelevant, but retriggering the test as there's a failed build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1378442872 ## streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.kafka.streams.query; + + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + + +import java.util.Optional; + +/** + * Interactive query for issuing range queries and scans over KeyValue stores. + * + * A range query retrieves a set of records, specified using an upper and/or lower bound on the keys. + * + * A scan query retrieves all records contained in the store. + * Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
mjsax commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1367565896 ## streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.kafka.streams.query; + + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + + +import java.util.Optional; + +/** + * Interactive query for issuing range queries and scans over KeyValue stores. + * + * A range query retrieves a set of records, specified using an upper and/or lower bound on the keys. + * + * A scan query retrieves all records contained in the store. + * Review Comment: Unnecessary tag. -- Also generic parameters should be describe -- we should also describe ordering guarantees better (we can wait for your other PR to get finished, and mimic what we did there here.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15110:Fix the fail start due to the existence of multiple version packages under libs [kafka]
hudeqi closed pull request #13893: KAFKA-15110:Fix the fail start due to the existence of multiple version packages under libs URL: https://github.com/apache/kafka/pull/13893 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15129;[9/N] Remove metrics in network package when broker shutdown [kafka]
hudeqi closed pull request #14043: KAFKA-15129;[9/N] Remove metrics in network package when broker shutdown URL: https://github.com/apache/kafka/pull/14043 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15129;[6/N] Remove metrics in ControllerStats when broker shutdown [kafka]
hudeqi closed pull request #13960: KAFKA-15129;[6/N] Remove metrics in ControllerStats when broker shutdown URL: https://github.com/apache/kafka/pull/13960 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15129;[8/N] Remove metrics in KafkaRequestHandlerPool when broker shutdown [kafka]
hudeqi closed pull request #14015: KAFKA-15129;[8/N] Remove metrics in KafkaRequestHandlerPool when broker shutdown URL: https://github.com/apache/kafka/pull/14015 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15129;[10/N] Remove metrics in log when broker shutdown [kafka]
hudeqi closed pull request #14058: KAFKA-15129;[10/N] Remove metrics in log when broker shutdown URL: https://github.com/apache/kafka/pull/14058 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15129;[5/N] Remove metrics in ControllerChannelManager when broker shutdown [kafka]
hudeqi closed pull request #13959: KAFKA-15129;[5/N] Remove metrics in ControllerChannelManager when broker shutdown URL: https://github.com/apache/kafka/pull/13959 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15129;[4/N] Remove metrics in Partition when broker shutdown and stopPartitions [kafka]
hudeqi closed pull request #13952: KAFKA-15129;[4/N] Remove metrics in Partition when broker shutdown and stopPartitions URL: https://github.com/apache/kafka/pull/13952 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15129;[3/N] Remove metrics in AbstractFetcherManager when fetcher manager instance shutdown [kafka]
hudeqi closed pull request #13929: KAFKA-15129;[3/N] Remove metrics in AbstractFetcherManager when fetcher manager instance shutdown URL: https://github.com/apache/kafka/pull/13929 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown [kafka]
hudeqi closed pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown URL: https://github.com/apache/kafka/pull/13926 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
mjsax commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1379524385 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ## @@ -127,14 +131,31 @@ public QueryResult query( final QueryConfig config) { + final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; -final QueryResult result = store.query(query, positionBound, config); +QueryResult result = store.query(query, positionBound, config); +final Position position = result.getPosition(); + +if (result.isSuccess()) { +if (result.getResult() instanceof byte[]) { +final byte[] plainValue = (byte[]) result.getResult(); +final byte[] valueWithTimestamp = convertToTimestampedFormat(plainValue); +result = (QueryResult) QueryResult.forResult(valueWithTimestamp); +} +if (result.getResult() instanceof RocksDbIterator) { +final WrappedRocksDbIterator wrappedRocksDBRangeIterator = new WrappedRocksDbIterator((RocksDbIterator) result.getResult()); +result = (QueryResult) QueryResult.forResult(wrappedRocksDBRangeIterator); +} Review Comment: We should chain this with an `else` that would throw an exception (we should never hit the `else` and it would be a bug -- throwing an exception would surface the bug clearly). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
mjsax commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1379523818 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ## @@ -127,14 +131,31 @@ public QueryResult query( final QueryConfig config) { + final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; -final QueryResult result = store.query(query, positionBound, config); +QueryResult result = store.query(query, positionBound, config); +final Position position = result.getPosition(); + +if (result.isSuccess()) { +if (result.getResult() instanceof byte[]) { +final byte[] plainValue = (byte[]) result.getResult(); +final byte[] valueWithTimestamp = convertToTimestampedFormat(plainValue); +result = (QueryResult) QueryResult.forResult(valueWithTimestamp); +} +if (result.getResult() instanceof RocksDbIterator) { Review Comment: we should chain with with an `else` to the previous `if` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
mjsax commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1379523619 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ## @@ -127,14 +131,31 @@ public QueryResult query( final QueryConfig config) { + final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; -final QueryResult result = store.query(query, positionBound, config); +QueryResult result = store.query(query, positionBound, config); +final Position position = result.getPosition(); + +if (result.isSuccess()) { +if (result.getResult() instanceof byte[]) { Review Comment: I think we should actually check the input `query` type instead: ``` if (query instanceof KeyQuery || query instanceof TimestampedKeyQuery) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
mjsax commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1379511059 ## streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java: ## @@ -27,6 +27,7 @@ public final class VersionedRecord { private final V value; private final long timestamp; +private final long validTo; Review Comment: Btw: given that `VersionedKeyQuery` also returns a `VersionedRecord`, I believe we need to update it to set this timestamp correctly for historic queried? If we query "latest" there is nothing to be done, but if we query an older version based on an `asOfTimestamp` we might need to take care of this case? Might also be worth mentioning this on the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15432:RLM Stop partitions should not be invoked for non-tiered storage topics [kafka]
hudeqi commented on PR #14667: URL: https://github.com/apache/kafka/pull/14667#issuecomment-1789956368 > > I am restarting the CI. There are large number of test failures including some TS ones. Let's wait to have a saner CI build. > > seems new CI failed tests are unrelated. @divijvaidya The only failed test about TS: org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest is successfully in local env. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15602: revert KAFKA-4852 [kafka]
mjsax commented on PR #14617: URL: https://github.com/apache/kafka/pull/14617#issuecomment-1789953849 Merged to `trunk` and cherry-picked to `3.6`, `3.5`, and `3.4` branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
mjsax commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1379469860 ## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ Review Comment: add `@param` for key and value types ## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery implements Query>> { + +private final K key; +private final Optional fromTime; +private final Optional toTime; +private final boolean isAscending; + +private MultiVersionedKeyQuery(final K key, final Optional fromTime, final Optional toTime, final boolean isAscending) { +this.key = Objects.requireNonNull(key); +this.fromTime = fromTime; +this.toTime = toTime; +this.isAscending = isAscending; +} + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). Review Comment: Would we ever return `null` -- I think the iterator would just be empty but never be `null`? Can we extend the JavaDocs similar to `VersionedKeyQuery` and talk about default (query full history) vs optionally limit the time range scope? ## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.an
Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]
ableegoldman commented on code in PR #14681: URL: https://github.com/apache/kafka/pull/14681#discussion_r1379493839 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: Interesting, I hadn't heard this definition...I suppose it makes sense in general, but I would actually argue that this specific scenario demonstrates an edge case where that definition breaks down. Wouldn't this also mean that we cannot even add new private constructors to a public class, just because they would show up in the javadocs? That doesn't sound right...I can see how one could argue that package-private and protected methods are considered part of the public API and should not be changed in a backwards incompatible way. But private methods have javadocs that would change, yet they are definitely not part of the public API. (Just curious about this, since I've often wondered if we even have a specific policy definitely what is/isn't strictly speaking considered a public API.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]
ijuma commented on code in PR #14681: URL: https://github.com/apache/kafka/pull/14681#discussion_r1379486496 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: Pointing to a commit where you did something doesn't quite provide evidence that it's following the process. ;) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]
ijuma commented on code in PR #14681: URL: https://github.com/apache/kafka/pull/14681#discussion_r1379485880 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: protected is public API for classes where inheritance is allowed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) [kafka]
mjsax commented on code in PR #14596: URL: https://github.com/apache/kafka/pull/14596#discussion_r1379482103 ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ## @@ -157,6 +213,30 @@ protected QueryResult runKeyQuery(final Query query, throw new UnsupportedOperationException("Versioned stores do not support KeyQuery queries at this time."); } +@SuppressWarnings("unchecked") +protected QueryResult runVersionedKeyQuery(final Query query, Review Comment: Why do we add this as `protected` ? Should be `private` IMHO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) [kafka]
mjsax commented on code in PR #14596: URL: https://github.com/apache/kafka/pull/14596#discussion_r1379471204 ## streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a single record from a versioned state store based on its key and timestamp. + * + * @param The type of the key. + * @param The type of the value. + */ +@Evolving +public final class VersionedKeyQuery implements Query> { + +private final K key; +private final Optional asOfTimestamp; + +private VersionedKeyQuery(final K key, final Optional asOfTimestamp) { +this.key = Objects.requireNonNull(key); Review Comment: This null check should go into `withKey(...)` (to get flatter stack traces). We should also add an error message: `Objects.requireNonNull(key, "key cannot be null");` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15730) KRaft support in ProducerFailureHandlingTest
[ https://issues.apache.org/jira/browse/KAFKA-15730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sameer Tejani resolved KAFKA-15730. --- Fix Version/s: 3.7.0 Resolution: Fixed > KRaft support in ProducerFailureHandlingTest > > > Key: KAFKA-15730 > URL: https://issues.apache.org/jira/browse/KAFKA-15730 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Priority: Minor > Labels: kraft, kraft-test, newbie > Fix For: 3.7.0 > > > The following tests in ProducerFailureHandlingTest in > core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala > need to be updated to support KRaft > 87 : def testTooLargeRecordWithAckZero(): Unit = { > 104 : def testTooLargeRecordWithAckOne(): Unit = { > 133 : def testPartitionTooLargeForReplicationWithAckAll(): Unit = { > 139 : def testResponseTooLargeForReplicationWithAckAll(): Unit = { > 147 : def testNonExistentTopic(): Unit = { > 164 : def testWrongBrokerList(): Unit = { > 181 : def testInvalidPartition(): Unit = { > 195 : def testSendAfterClosed(): Unit = { > 215 : def testCannotSendToInternalTopic(): Unit = { > 223 : def testNotEnoughReplicas(): Unit = { > 236 : def testNotEnoughReplicasAfterBrokerShutdown(): Unit = { > Scanned 260 lines. Found 0 KRaft tests out of 11 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15731) KRaft support in ListOffsetsIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sameer Tejani resolved KAFKA-15731. --- Fix Version/s: 3.7.0 Resolution: Fixed > KRaft support in ListOffsetsIntegrationTest > --- > > Key: KAFKA-15731 > URL: https://issues.apache.org/jira/browse/KAFKA-15731 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Priority: Minor > Labels: kraft, kraft-test, newbie > Fix For: 3.7.0 > > > The following tests in ListOffsetsIntegrationTest in > core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala > need to be updated to support KRaft > 55 : def testEarliestOffset(): Unit = { > 61 : def testLatestOffset(): Unit = { > 67 : def testMaxTimestampOffset(): Unit = { > Scanned 96 lines. Found 0 KRaft tests out of 3 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15732) KRaft support in LogAppendTimeTest
[ https://issues.apache.org/jira/browse/KAFKA-15732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sameer Tejani resolved KAFKA-15732. --- Fix Version/s: 3.7.0 Resolution: Fixed > KRaft support in LogAppendTimeTest > -- > > Key: KAFKA-15732 > URL: https://issues.apache.org/jira/browse/KAFKA-15732 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Priority: Minor > Labels: kraft, kraft-test, newbie > Fix For: 3.7.0 > > > The following tests in LogAppendTimeTest in > core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala need to be > updated to support KRaft > 50 : def testProduceConsume(): Unit = { > Scanned 78 lines. Found 0 KRaft tests out of 1 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15733) KRaft support in FetchRequestMaxBytesTest
[ https://issues.apache.org/jira/browse/KAFKA-15733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sameer Tejani resolved KAFKA-15733. --- Fix Version/s: 3.7.0 Resolution: Fixed > KRaft support in FetchRequestMaxBytesTest > - > > Key: KAFKA-15733 > URL: https://issues.apache.org/jira/browse/KAFKA-15733 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Priority: Minor > Labels: kraft, kraft-test, newbie > Fix For: 3.7.0 > > > The following tests in FetchRequestMaxBytesTest in > core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala need to > be updated to support KRaft > 105 : def testConsumeMultipleRecords(): Unit = { > Scanned 134 lines. Found 0 KRaft tests out of 1 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15753) KRaft support in BrokerApiVersionsCommandTest
[ https://issues.apache.org/jira/browse/KAFKA-15753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781948#comment-17781948 ] Sameer Tejani commented on KAFKA-15753: --- [~linzihao1999] if these issues are simple enough (if you look at other PRs that have enabled KRaft mode by using `kafka.utils.TestInfoUtils`) then let's go ahead and implement them now so that we can get more coverage on KRaft. > KRaft support in BrokerApiVersionsCommandTest > - > > Key: KAFKA-15753 > URL: https://issues.apache.org/jira/browse/KAFKA-15753 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in BrokerApiVersionsCommandTest in > core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala > need to be updated to support KRaft > 50 : def checkBrokerApiVersionCommandOutput(): Unit = { > Scanned 80 lines. Found 0 KRaft tests out of 1 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14419) Failed SyncGroup leading to partitions lost due to processing during rebalances
[ https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781946#comment-17781946 ] Matthias J. Sax commented on KAFKA-14419: - [~kirktrue] [~pnee] – will the new consumer threading help with this? Maybe it's not something we should try to fix at the Streams layer to begin with? > Failed SyncGroup leading to partitions lost due to processing during > rebalances > --- > > Key: KAFKA-14419 > URL: https://issues.apache.org/jira/browse/KAFKA-14419 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Four Kafka client application instances on separate EC2 instances with a > total of 8 active and 8 standby stream tasks for the same stream topology, > consuming from an input topic with 8 partitions. Sometimes a handful of > messages are consumed twice by one of the stream tasks when stream tasks on > another application instance join the consumer group after an application > instance restart. > Additional information: > Messages are produced to the topic by another Kafka streams topology deployed > on the same four application instances. I have verified that each message is > only produced once by enabling debug logging in the topology flow right > before producing each message to the topic. > Logs from stream thread with duplicate consumption: > > {code:java} > 2022-11-21 15:09:33,677 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is > already rebalancing > 2022-11-21 15:09:33,677 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] (Re-)joining group > Input records consumed for the first time > 2022-11-21 15:09:33,919 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Successfully joined group with > generation Generation{generationId=8017, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:33,920 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began > another rebalance. Need to re-join the group. Sent generation was > Generation{generationId=8017, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:33,922 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: > encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response > 2022-11-21 15:09:33,922 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Request joining group due to: > encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response > 2022-11-21 15:09:33,923 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:819] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Giving away all assigned partitions as > lost since generation/me
Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]
mjsax commented on code in PR #14681: URL: https://github.com/apache/kafka/pull/14681#discussion_r1379448271 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: We did a similar change w/o a KIP in the past (https://github.com/apache/kafka/commit/487b95454233e981a65c184c7f3b86adf34058ef) Actually similar for `ConsumerConfig` (https://github.com/apache/kafka/commit/283a19481d1ce4a77f5f465e7b96288db22a8ff1) -- note that while this PR is part of a KIP, the KIP is about something totally different and does not mention this change at all. I agree it's borderline, but believe it's ok. -- In the end, I would rather exclude `protected` stuff from the JavaDocs as I don't think `protected` is actually public API? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]
mjsax commented on code in PR #14681: URL: https://github.com/apache/kafka/pull/14681#discussion_r1379448271 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: We did a similar change w/o a KIP in the past (https://github.com/apache/kafka/commit/487b95454233e981a65c184c7f3b86adf34058ef) I agree it's borderline, but believe it's ok. -- In the end, I would rather exclude `protected` stuff from the JavaDocs as I don't think `protected` is actually public API? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer
[ https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781935#comment-17781935 ] Matthias J. Sax commented on KAFKA-15602: - [~luke.kirby] – it seems unlikely that there will be a 3.4.2 bug-fix release – I am actually surprised that we do a 3.5.2... Historically, we focus on bug-fix releases for the current release minor/major version (ie, currently 3.6) only. > Breaking change in 3.4.0 ByteBufferSerializer > - > > Key: KAFKA-15602 > URL: https://issues.apache.org/jira/browse/KAFKA-15602 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1 >Reporter: Luke Kirby >Assignee: Matthias J. Sax >Priority: Critical > Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1 > > > [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have > solved the situation described by KAFKA-4852, namely, to have > ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 > offsets (or, put another way, to honor the buffer's position() as the start > point to consume bytes from). Unfortunately, it failed to actually do this, > and instead changed the expectations for how an input ByteBuffer's limit and > position should be set before being provided to send() on a producer > configured with ByteBufferSerializer. Code that worked with pre-3.4.0 > releases now produce 0-length messages instead of the intended messages, > effectively introducing a breaking change for existing users of the > serializer in the wild. > Here are a few different inputs and serialized outputs under pre-3.4.0 and > 3.4.0+ to summarize the breaking change: > ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output|| > |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 > val=test|len=4 val=test| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 > val=test|len=0 val=| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 > val=test<0><0><0><0>|len=4 val=test| > |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8)); > buff.limit(buff.position());|len=4 > val=test|len=4 val=test| > |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t| > Notably, plain-wrappers of byte arrays continue to work under both versions > due to the special case in the serializer for them. I suspect that this is > the dominant use-case, which is why this has apparently gone un-reported to > this point. The wrapped-with-offset case fails for both cases for different > reasons (the expected value would be "est"). As demonstrated here, you can > ensure that a manually assembled ByteBuffer will work under both versions by > ensuring that your buffers start have position == limit == message-length > (and an actual desired start position of 0). Clearly, though, behavior has > changed dramatically for the second and third case there, with the 3.3.2 > behavior, in my experience, aligning better with naive expectations. > [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java], > the serializer would just rewind() the buffer and respect the limit as the > indicator as to how much data was in the buffer. So, essentially, the > prevailing contract was that the data from position 0 (always!) up to the > limit on the buffer would be serialized; so it was really just the limit that > was honored. So if, per the original issue, you have a byte[] array wrapped > with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() > with position = 3 indicating the desired start point to read from, but > effectively ignored by the serializer due to the rewind(). > So while the serializer didn't work when presenting a ByteBuffer view onto a > sub-view of a backing array, it did however follow expected behavior when > employing standard patterns to populate ByteBuffers backed by > larger-than-necessary arrays and using limit() to identify the end of actual > data, consistent with conventional usage of flip() to switch from writing to > a buffer to setting it up to be read from (e.g., to be passed into a > producer.send() call). E.g., > {code:java} > ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH); > ... // some sequence of > bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH > ... > bb.flip(); /* logically, this says "I am done writing, let's set this up for > reading"; pragmatically, it sets the limit to the current position so that > whoever reads the buffer knows when to stop reading, and sets the position to > zero so it knows where to start reading from */ > producer.send(bb); {co
Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]
apoorvmittal10 commented on PR #14618: URL: https://github.com/apache/kafka/pull/14618#issuecomment-1789838514 @wcarlson5 @mjsax 16 tests unrelated to the PR are failing. As we have +1 from @xvrl hence can we please merge this PR now. Thanks for the help. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]
jolshan commented on PR #14489: URL: https://github.com/apache/kafka/pull/14489#issuecomment-1789834583 Newest test failures look unrelated: [Build / JDK 17 and Scala 2.13 / kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testDescribeTokenForOtherUserFails(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14489/5/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationWithOwnerTest/Build___JDK_17_and_Scala_2_13___testDescribeTokenForOtherUserFails_String__quorum_kraft/) [Build / JDK 11 and Scala 2.13 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14489/5/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testRackAwareRangeAssignor_String__quorum_zk/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14489/5/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_11_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/) [Build / JDK 21 and Scala 2.13 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14489/5/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_21_and_Scala_2_13___testRackAwareRangeAssignor_String__quorum_zk/) [Build / JDK 21 and Scala 2.13 / kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithDescribeAclViaSubscribe(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14489/5/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationWithOwnerTest/Build___JDK_21_and_Scala_2_13___testNoConsumeWithDescribeAclViaSubscribe_String__quorum_kraft/) [Build / JDK 21 and Scala 2.13 / kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testDescribeTokenForOtherUserPasses(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14489/5/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationWithOwnerTest/Build___JDK_21_and_Scala_2_13___testDescribeTokenForOtherUserPasses_String__quorum_kraft/) [Build / JDK 21 and Scala 2.13 / kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestExcludingClusterAuthorizedOperations(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14489/5/testReport/junit/kafka.server/DescribeClusterRequestTest/Build___JDK_21_and_Scala_2_13___testDescribeClusterRequestExcludingClusterAuthorizedOperations_String__quorum_kraft/) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14621: URL: https://github.com/apache/kafka/pull/14621#discussion_r1379428164 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Properties; + +/** + * Handles client telemetry metrics requests/responses, subscriptions and instance information. + */ +public class ClientMetricsManager implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); +private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + +public static ClientMetricsManager getInstance() { Review Comment: Done for both `getInstance` and `getDefaultProperties`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
kirktrue commented on PR #14690: URL: https://github.com/apache/kafka/pull/14690#issuecomment-1789830091 BTW, although it's hard for a someone of really little brain like myself to follow at times, the chained `Future` mechanism is very interesting and useful here. My one concern is that we don't allow those `Future`s to be completed by the application thread, as that would then kick off all the chained logic in the application thread. That would be bad 😆 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14621: URL: https://github.com/apache/kafka/pull/14621#discussion_r1379426628 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2619,6 +2619,9 @@ private ConfigEntry.ConfigSource configSource(DescribeConfigsResponse.ConfigSour case DYNAMIC_BROKER_LOGGER_CONFIG: configSource = ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG; break; +case CLIENT_METRICS_CONFIG: Review Comment: Sounds good, thanks for letting me know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
kirktrue commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1379392210 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -729,6 +729,13 @@ protected MetadataRequest.Builder newMetadataRequestBuilderForNewTopics() { return null; } +/** + * @return Mapping from topic IDs to topic names for all topics in the cache. + */ +public synchronized Map topicNames() { Review Comment: Now that I'm noticing, is this a public API violation? It's not in `internals` and we're adding a `public` method 🤔 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -73,30 +84,55 @@ public interface MembershipManager { /** * @return Current assignment for the member. */ -ConsumerGroupHeartbeatResponseData.Assignment currentAssignment(); +Set currentAssignment(); /** - * Update the assignment for the member, indicating that the provided assignment is the new - * current assignment. + * @return Assignment that the member received from the server but hasn't finished processing + * yet. */ -void onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment assignment); +Optional targetAssignment(); Review Comment: `targetAssignment()` looks to only be used by unit tests at the moment. Does it make sense to remove it from the interface and leave it as a method on the implementation only? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -260,42 +743,16 @@ public Optional serverAssignor() { * {@inheritDoc} */ @Override -public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() { +public Set currentAssignment() { return this.currentAssignment; } /** * @return Assignment that the member received from the server but hasn't completely processed - * yet. Visible for testing. Review Comment: I made a comment up above about removing `targetAssignment()` from the `MembershipManager` interface because it was only used for testing. Does the removal of this statement imply that it will be used in non-testing later? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -24,21 +24,34 @@ public enum MemberState { /** - * Member has not joined a consumer group yet, or has been fenced and needs to re-join. + * Member is not part of the group. This could be the case when it has never joined (no call + * has been made to the subscribe API), or when the member intentionally leaves the group + * after a call to the unsubscribe API. */ -UNJOINED, +NOT_IN_GROUP, + +/** + * Member is attempting to join a consumer group. This could be the case when joining for the + * first time, or when it has been fenced and tries to re-join. + */ +JOINING, /** * Member has received a new target assignment (partitions could have been assigned or * revoked), and it is processing it. While in this state, the member will * invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and then make * the new assignment effective. */ -// TODO: determine if separate state will be needed for assign/revoke (not for now) RECONCILING, /** - * Member is active in a group (heartbeating) and has processed all assignments received. + * Member has completed reconciling an assignment received, and stays in this state until the + * next heartbeat request is sent out to acknowledge the assignment to the server. Review Comment: As I understand, this is saying the consumer will leave the `ACKNOWLEDGING_RECONCILED_ASSIGNMENT` state as soon as the next heartbeat is sent off, rather than the next heartbeat is received, right? What happens if that heartbeat request gets lost? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -24,21 +24,34 @@ public enum MemberState { /** - * Member has not joined a consumer group yet, or has been fenced and needs to re-join. + * Member is not part of the group. This could be the case when it has never joined (no call + * has been made to the subscribe API), or when the member intentionally leaves the group + * after a call to the unsubscribe API. */ -UNJOINED, +NOT_IN_GROUP, Review Comment: This is the case where the consumer is not in a group _presently_, right? A consumer without a configured group ID wouldn't get to this point, would it? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -73,30 +84,55 @@ public interface MembershipManager { /** * @return Cu
Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]
jolshan commented on PR #14387: URL: https://github.com/apache/kafka/pull/14387#issuecomment-1789827363 I took a first pass at the files today, and will take another look more closely at the metrics themselves tomorrow or so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]
jolshan commented on code in PR #14387: URL: https://github.com/apache/kafka/pull/14387#discussion_r1379424990 ## checkstyle/import-control.xml: ## @@ -240,6 +240,7 @@ + Review Comment: Are we planning not to add any of the new sensor metrics though? I assume that's what the description is referring to when it says we need to wait for other things to get the sensors. > Metrics to add after https://github.com/apache/kafka/pull/14408 is merged: >offset deletions sensor (OffsetDeletions); Meter(offset-deletion-rate, offset-deletion-count) Metrics to add after https://issues.apache.org/jira/browse/KAFKA-14987 is merged: >offset expired sensor (OffsetExpired); Meter(offset-expiration-rate, offset-expiration-count) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14621: URL: https://github.com/apache/kafka/pull/14621#discussion_r1379425234 ## clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java: ## @@ -222,6 +222,7 @@ public enum ConfigSource { DYNAMIC_BROKER_LOGGER_CONFIG, // dynamic broker logger config that is configured for a specific broker DYNAMIC_BROKER_CONFIG, // dynamic broker config that is configured for a specific broker DYNAMIC_DEFAULT_BROKER_CONFIG, // dynamic broker config that is configured as default for all brokers in the cluster +DYNAMIC_CLIENT_METRICS_CONFIG, // dynamic client metrics subscription config that is configured for all clients Review Comment: I might be wrong but shouldn't this be needed to make `kafka-config.sh` work to create `subscription` for client_metrics. To support the `kafka-config.sh` utility we would require change in `ConfigEntry` and `ConfigSource`. I have this PR that uses same: https://github.com/apache/kafka/pull/14632 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]
jolshan commented on code in PR #14387: URL: https://github.com/apache/kafka/pull/14387#discussion_r1379424990 ## checkstyle/import-control.xml: ## @@ -240,6 +240,7 @@ + Review Comment: Are we planning not to add any of the new sensor metrics though? I assume that's what the description is referring to when it says we need to wait for other things to get the sensors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]
lzyLuke commented on PR #14682: URL: https://github.com/apache/kafka/pull/14682#issuecomment-1789818408 > Thanks for the PR, @lzyLuke . > > Please remove these two lines from "The following features are not fully implemented in KRaft mode": > > > ``` > > Modifying certain dynamic configurations on the standalone KRaft controller > > ``` > > > > > > > > > > > > > > > > > > > > > > > >* Delegation tokens I could not find "delegation tokens" related lines. Probably it has been deleted already? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]
jeffkbkim commented on code in PR #14387: URL: https://github.com/apache/kafka/pull/14387#discussion_r1379419320 ## checkstyle/import-control.xml: ## @@ -240,6 +240,7 @@ + Review Comment: yeah, unfortunately we need this for the existing metrics -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]
jolshan commented on code in PR #14387: URL: https://github.com/apache/kafka/pull/14387#discussion_r1379418802 ## checkstyle/import-control.xml: ## @@ -240,6 +240,7 @@ + Review Comment: Or do we have this just to cover the existing generic metrics? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1379416743 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Given this is separate to the requestLocal issue, does it make sense to file a JIRA and fix there? We can still consider it for a patch release if we think this is a regression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jeffkbkim commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1379393118 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ## @@ -537,22 +533,34 @@ public Map> partitionsPendingAssignment() { public String currentAssignmentSummary() { return "CurrentAssignment(memberEpoch=" + memberEpoch + ", previousMemberEpoch=" + previousMemberEpoch + -", targetMemberEpoch=" + targetMemberEpoch + ", state=" + state + ", assignedPartitions=" + assignedPartitions + -", partitionsPendingRevocation=" + partitionsPendingRevocation + -", partitionsPendingAssignment=" + partitionsPendingAssignment + +", revokedPartitions=" + revokedPartitions + ')'; } +/** + * @return True if the assignment of this member is equals to the assignment Review Comment: nit: is equal to ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ## @@ -511,24 +507,24 @@ public MemberState state() { } /** - * @return The set of assigned partitions. + * @return True of the member is in the Stable state and at the desired epoch. Review Comment: nit: True if ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. * - * - Partitions Pending Assignment: - * The set of partitions that the member will eventually receive. The partitions in this set are - * still owned by other members in the group. + * - UNACKNOWLEDGED_ASSIGNMENT: + * The member has received a new assignment from the group coordinator but + * he has not acknowledged it yet. The member is removed from the group if + * he does not acknowledge it within the rebalance timeout. * - * The state machine has three states: - * - REVOKING: - * This state means that the member must revoke partitions before it can transition to the next epoch - * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions - * are committed with the current epoch. The member transitions to the next state only when it has - * acknowledged the revocation. + * Valid Transitions: + * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE + * When the assignment is acknowledged, the member transitions to the STABLE + * state if it is fully reconciled. * - * - ASSIGNING: - * This state means that the member waits on partitions which are still owned by other members in the - * group. It remains in this state until they are all freed up. + * - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT + * When the assignment is acknowledged, the member remains in the + * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed. * - * - STABLE: - * This state means that the member has received all its assigned partitions. + * If the next assi
[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer
[ https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781922#comment-17781922 ] Philip Nee commented on KAFKA-15602: Hi [~luke.kirby] - You don't need to become a committer to write a KIP, apache committer means something different :) You just need to send an email to the mailing list to request the permission per... {code:java} If this is your first time contributing:Sign up for the Developer mailing list d...@kafka.apache.org . The instructions to sign up are here: http://kafka.apache.org/contactCreate a wiki ID (https://cwiki.apache.org/confluence/signup.action)Create a Jira ID (It's a different system than the wiki) using the ASF Self-serve Portal (https://selfserve.apache.org/jira-account.html)Send an email to the dev mailing list (d...@kafka.apache.org) containing your wiki ID and Jira ID requesting permissions to contribute to Apache Kafka. {code} Looking forward to seeing the KIP! > Breaking change in 3.4.0 ByteBufferSerializer > - > > Key: KAFKA-15602 > URL: https://issues.apache.org/jira/browse/KAFKA-15602 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1 >Reporter: Luke Kirby >Assignee: Matthias J. Sax >Priority: Critical > Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1 > > > [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have > solved the situation described by KAFKA-4852, namely, to have > ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 > offsets (or, put another way, to honor the buffer's position() as the start > point to consume bytes from). Unfortunately, it failed to actually do this, > and instead changed the expectations for how an input ByteBuffer's limit and > position should be set before being provided to send() on a producer > configured with ByteBufferSerializer. Code that worked with pre-3.4.0 > releases now produce 0-length messages instead of the intended messages, > effectively introducing a breaking change for existing users of the > serializer in the wild. > Here are a few different inputs and serialized outputs under pre-3.4.0 and > 3.4.0+ to summarize the breaking change: > ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output|| > |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 > val=test|len=4 val=test| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 > val=test|len=0 val=| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 > val=test<0><0><0><0>|len=4 val=test| > |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8)); > buff.limit(buff.position());|len=4 > val=test|len=4 val=test| > |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t| > Notably, plain-wrappers of byte arrays continue to work under both versions > due to the special case in the serializer for them. I suspect that this is > the dominant use-case, which is why this has apparently gone un-reported to > this point. The wrapped-with-offset case fails for both cases for different > reasons (the expected value would be "est"). As demonstrated here, you can > ensure that a manually assembled ByteBuffer will work under both versions by > ensuring that your buffers start have position == limit == message-length > (and an actual desired start position of 0). Clearly, though, behavior has > changed dramatically for the second and third case there, with the 3.3.2 > behavior, in my experience, aligning better with naive expectations. > [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java], > the serializer would just rewind() the buffer and respect the limit as the > indicator as to how much data was in the buffer. So, essentially, the > prevailing contract was that the data from position 0 (always!) up to the > limit on the buffer would be serialized; so it was really just the limit that > was honored. So if, per the original issue, you have a byte[] array wrapped > with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() > with position = 3 indicating the desired start point to read from, but > effectively ignored by the serializer due to the rewind(). > So while the serializer didn't work when presenting a ByteBuffer view onto a > sub-view of a backing array, it did however follow expected behavior when > employing standard patterns to populate ByteBuffers backed by > larger-than-necessary arrays and using limit() to identify the end of actual > data, consistent with conventional usage of flip() to switch from writing to > a buffer to setting it up to be read from (e
Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]
jolshan commented on code in PR #14387: URL: https://github.com/apache/kafka/pull/14387#discussion_r1379411500 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorMetricsShard.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import com.yammer.metrics.core.MetricName; +import org.apache.kafka.common.TopicPartition; + +/** + * A CoordinatorMetricsShard is mapped to a single CoordinatorShard. For gauges, each metrics shard increments/decrements + * based on the operations performed. Then, {@link CoordinatorMetrics} will perform aggregations across all shards. + * + * For sensors, each shard individually records the observed values. + */ +public interface CoordinatorMetricsShard { +/** + * Increment a gauge. + * + * @param metricName the metric name. + * @param isGlobal whether the metric is globally shared across shards + */ +void increment(MetricName metricName, boolean isGlobal); Review Comment: would it be more useful to have separate methods for global vs not global? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
hachikuji commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1379404877 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: @junrao If the lock cannot protect the write, then the validation in the group coordinator cannot be relied on. In principle, it would be possible for an older offset commit to override a newer one for example. A more likely scenario might be for a group to rebalance while an offset commit is awaiting validation. This would allow a fenced consumer to be able to commit offsets. Without the lock, I think we'd have to introduce additional bookkeeping to track dependencies between inflight writes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
CalvinConfluent commented on PR #14604: URL: https://github.com/apache/kafka/pull/14604#issuecomment-1789790905 Failed tests are irrelevant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]
jolshan commented on code in PR #14387: URL: https://github.com/apache/kafka/pull/14387#discussion_r1379402554 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -352,7 +374,14 @@ public CoordinatorResult commitOffset( }); }); -return new CoordinatorResult<>(records, response); +CompletableFuture appendFuture = new CompletableFuture<>(); +appendFuture.whenComplete((__, t) -> { Review Comment: nit: are my eyes playing tricks on me or is the _ two __? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]
jolshan commented on code in PR #14387: URL: https://github.com/apache/kafka/pull/14387#discussion_r1379398861 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -503,6 +547,7 @@ public void onLoaded(MetadataImage newImage) { MetadataDelta emptyDelta = new MetadataDelta(newImage); groupMetadataManager.onNewMetadataImage(newImage, emptyDelta); offsetMetadataManager.onNewMetadataImage(newImage, emptyDelta); +coordinatorMetrics.activateMetricsShard(metricsShard); Review Comment: Is there a reason we didn't want to put deactivate the shard in onUnloaded? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]
jolshan commented on code in PR #14387: URL: https://github.com/apache/kafka/pull/14387#discussion_r1379398861 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -503,6 +547,7 @@ public void onLoaded(MetadataImage newImage) { MetadataDelta emptyDelta = new MetadataDelta(newImage); groupMetadataManager.onNewMetadataImage(newImage, emptyDelta); offsetMetadataManager.onNewMetadataImage(newImage, emptyDelta); +coordinatorMetrics.activateMetricsShard(metricsShard); Review Comment: It's probably not a concern for this PR, but it would be nice if the activate and deactivate could be in the same file. I think any coordinator shard could have an unload method for example. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]
jolshan commented on code in PR #14387: URL: https://github.com/apache/kafka/pull/14387#discussion_r1379393439 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -140,7 +159,12 @@ public GroupCoordinatorShard build() { throw new IllegalArgumentException("Time must be set."); if (timer == null) throw new IllegalArgumentException("Timer must be set."); +if (coordinatorMetrics == null) +throw new IllegalArgumentException("CoordinatorMetricsShard must be set."); Review Comment: I noticed we check the metrics shard to ensure it is a GroupCoordinatorMetrics shard in another file. Should we check the type sooner? Should we also check this is the type GroupCoordinatorMetrics (also should the error not include shard here)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] DO NOT MERGE: introduce internal StoreFactory [kafka]
ableegoldman commented on code in PR #14659: URL: https://github.com/apache/kafka/pull/14659#discussion_r1379390131 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ## @@ -611,33 +545,38 @@ public final void addProcessor(final String name, public final void addStateStore(final StoreBuilder storeBuilder, final String... processorNames) { +addStateStore(new StoreBuilderWrapper<>(storeBuilder), false, processorNames); +} + +public final void addStateStore(final StoreFactory storeBuilder, +final String... processorNames) { addStateStore(storeBuilder, false, processorNames); } -public final void addStateStore(final StoreBuilder storeBuilder, +public final void addStateStore(final StoreFactory storeFactory, final boolean allowOverride, final String... processorNames) { -Objects.requireNonNull(storeBuilder, "storeBuilder can't be null"); -final StateStoreFactory stateFactory = stateFactories.get(storeBuilder.name()); -if (!allowOverride && stateFactory != null && !stateFactory.builder.equals(storeBuilder)) { -throw new TopologyException("A different StateStore has already been added with the name " + storeBuilder.name()); +Objects.requireNonNull(storeFactory, "stateStoreFactory can't be null"); +final StoreFactory stateFactory = stateFactories.get(storeFactory.name()); +if (!allowOverride && stateFactory != null && !stateFactory.isCompatibleWith(storeFactory)) { +throw new TopologyException("A different StateStore has already been added with the name " + storeFactory.name()); } -if (globalStateBuilders.containsKey(storeBuilder.name())) { -throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeBuilder.name()); +if (globalStateBuilders.containsKey(storeFactory.name())) { +throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeFactory.name()); } -stateFactories.put(storeBuilder.name(), new StateStoreFactory<>(storeBuilder)); +stateFactories.put(storeFactory.name(), storeFactory); if (processorNames != null) { for (final String processorName : processorNames) { Objects.requireNonNull(processorName, "processor name must not be null"); -connectProcessorAndStateStore(processorName, storeBuilder.name()); +connectProcessorAndStateStore(processorName, storeFactory.name()); } } nodeGroups = null; } -public final void addGlobalStore(final StoreBuilder storeBuilder, +public final void addGlobalStore(final StoreFactory storeBuilder, Review Comment: very understandable -- just wanted to leave a comment in case you missed it/to help identify some of those two million lines 😭 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ## @@ -611,33 +545,38 @@ public final void addProcessor(final String name, public final void addStateStore(final StoreBuilder storeBuilder, final String... processorNames) { +addStateStore(new StoreBuilderWrapper<>(storeBuilder), false, processorNames); +} + +public final void addStateStore(final StoreFactory storeBuilder, +final String... processorNames) { addStateStore(storeBuilder, false, processorNames); } -public final void addStateStore(final StoreBuilder storeBuilder, +public final void addStateStore(final StoreFactory storeFactory, final boolean allowOverride, final String... processorNames) { -Objects.requireNonNull(storeBuilder, "storeBuilder can't be null"); -final StateStoreFactory stateFactory = stateFactories.get(storeBuilder.name()); -if (!allowOverride && stateFactory != null && !stateFactory.builder.equals(storeBuilder)) { -throw new TopologyException("A different StateStore has already been added with the name " + storeBuilder.name()); +Objects.requireNonNull(storeFactory, "stateStoreFactory can't be null"); +final StoreFactory stateFactory = stateFactories.get(storeFactory.name()); +if (!allowOverride && stateFactory != null && !stateFactory.isCompatibleWith(storeFactory)) { +throw new TopologyException("A different StateStore has already been added with the name " + storeFactory.name()); } -if (globalStateBuilders.containsKey(storeBuilder.name())) { -throw new TopologyException("
Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]
jolshan commented on code in PR #14387: URL: https://github.com/apache/kafka/pull/14387#discussion_r1379389203 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -121,6 +126,20 @@ public CoordinatorShardBuilder withTimer( return this; } +@Override +public CoordinatorShardBuilder withCoordinatorMetrics( +CoordinatorMetrics coordinatorMetrics +) { +this.coordinatorMetrics = coordinatorMetrics; +return this; +} + +@Override Review Comment: for my understanding, this topic partition is only used for metrics tagging? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]
jolshan commented on code in PR #14387: URL: https://github.com/apache/kafka/pull/14387#discussion_r1379388116 ## checkstyle/import-control.xml: ## @@ -240,6 +240,7 @@ + Review Comment: I recall we removed yammer metrics from the last pr, do we also want to do that here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer
[ https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781911#comment-17781911 ] Luke Kirby commented on KAFKA-15602: Thanks so much team! Maybe not the right place to ask this, but is there a plan for a 3.4.2 release? I can see that there's a 3.5.2 just waiting on some things. On the KIP front, I am still interested in pursuing that, but it's not as pressing a concern. Reading through the guidance there (thanks Philip!) it looks like I have to request committer status first? I'll probably get some CCLA stuff rolling on my end to make that happen. > Breaking change in 3.4.0 ByteBufferSerializer > - > > Key: KAFKA-15602 > URL: https://issues.apache.org/jira/browse/KAFKA-15602 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1 >Reporter: Luke Kirby >Assignee: Matthias J. Sax >Priority: Critical > Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1 > > > [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have > solved the situation described by KAFKA-4852, namely, to have > ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 > offsets (or, put another way, to honor the buffer's position() as the start > point to consume bytes from). Unfortunately, it failed to actually do this, > and instead changed the expectations for how an input ByteBuffer's limit and > position should be set before being provided to send() on a producer > configured with ByteBufferSerializer. Code that worked with pre-3.4.0 > releases now produce 0-length messages instead of the intended messages, > effectively introducing a breaking change for existing users of the > serializer in the wild. > Here are a few different inputs and serialized outputs under pre-3.4.0 and > 3.4.0+ to summarize the breaking change: > ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output|| > |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 > val=test|len=4 val=test| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 > val=test|len=0 val=| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 > val=test<0><0><0><0>|len=4 val=test| > |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8)); > buff.limit(buff.position());|len=4 > val=test|len=4 val=test| > |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t| > Notably, plain-wrappers of byte arrays continue to work under both versions > due to the special case in the serializer for them. I suspect that this is > the dominant use-case, which is why this has apparently gone un-reported to > this point. The wrapped-with-offset case fails for both cases for different > reasons (the expected value would be "est"). As demonstrated here, you can > ensure that a manually assembled ByteBuffer will work under both versions by > ensuring that your buffers start have position == limit == message-length > (and an actual desired start position of 0). Clearly, though, behavior has > changed dramatically for the second and third case there, with the 3.3.2 > behavior, in my experience, aligning better with naive expectations. > [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java], > the serializer would just rewind() the buffer and respect the limit as the > indicator as to how much data was in the buffer. So, essentially, the > prevailing contract was that the data from position 0 (always!) up to the > limit on the buffer would be serialized; so it was really just the limit that > was honored. So if, per the original issue, you have a byte[] array wrapped > with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() > with position = 3 indicating the desired start point to read from, but > effectively ignored by the serializer due to the rewind(). > So while the serializer didn't work when presenting a ByteBuffer view onto a > sub-view of a backing array, it did however follow expected behavior when > employing standard patterns to populate ByteBuffers backed by > larger-than-necessary arrays and using limit() to identify the end of actual > data, consistent with conventional usage of flip() to switch from writing to > a buffer to setting it up to be read from (e.g., to be passed into a > producer.send() call). E.g., > {code:java} > ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH); > ... // some sequence of > bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH > ... > bb.flip(); /* logically, this says "I am done writing, let's set this up for > reading"; pragmatically, it sets the limi
[jira] [Updated] (KAFKA-15770) org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy is flaky
[ https://issues.apache.org/jira/browse/KAFKA-15770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15770: Component/s: unit tests > org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy > is flaky > --- > > Key: KAFKA-15770 > URL: https://issues.apache.org/jira/browse/KAFKA-15770 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Alok Thatikunta >Priority: Major > > Test fails on CI, passes locally > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14607/9/testReport/junit/org.apache.kafka.streams.integration/ConsistencyVectorIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldHaveSamePositionBoundActiveAndStandBy/] > {code:java} > java.lang.AssertionError: > Result:SucceededQueryResult{result=<0,1698511250443>, executionInfo=[], > position=Position{position={input-topic={0=50 > Expected: is > but: was {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm opened a new pull request, #14690: URL: https://github.com/apache/kafka/pull/14690 Updates for the Client state machine, Heartbeat and Commit managers, to support required states and transitions as the member interacts with a consumer group, allowing to commit offsets, and get assignment that the member reconciles. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15627: KIP-951's Leader discovery optimisations on the client [kafka]
wcarlson5 merged PR #14685: URL: https://github.com/apache/kafka/pull/14685 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15627: KIP-951's Leader discovery optimisations on the client [kafka]
wcarlson5 commented on PR #14685: URL: https://github.com/apache/kafka/pull/14685#issuecomment-1789576997 I'm satisfied with comments and discussion on old PR https://github.com/apache/kafka/pull/14564 so I'm going to go a head and merge this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15627: KIP-951's Leader discovery optimisations on the client [kafka]
wcarlson5 commented on code in PR #14685: URL: https://github.com/apache/kafka/pull/14685#discussion_r1379261920 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, MetadataResponse response, b log.debug("Updated cluster metadata updateVersion {} to {}", this.updateVersion, this.cache); } +/** + * Update the metadata by merging existing metadata with the input leader information and nodes. This is called whenever + * partial updates to metadata are returned in a response from broker(ex - ProduceResponse & FetchResponse). + * Note that the updates via Metadata RPC are handled separately in ({@link #update}). + * Both partitionLeader and leaderNodes override the existing metadata. Non-overlapping metadata is kept as it is. + * @param partitionLeaders map of new leadership information for partitions. + * @param leaderNodes a list of nodes for leaders in the above map. + * @return a set of partitions, for which leaders were updated. + */ +public synchronized Set updatePartially(Map partitionLeaders, List leaderNodes) { +Map newNodes = leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node)); +// Insert non-overlapping nodes from existing-nodes into new-nodes. +this.cache.cluster().nodes().stream().forEach(node -> newNodes.putIfAbsent(node.id(), node)); + +// Create partition-metadata for all updated partitions. Exclude updates for partitions - +// 1. for which the corresponding partition has newer leader in existing metadata. +// 2. for which corresponding leader's node is missing in the new-nodes. +// 3. for which the existing metadata doesn't know about the partition. +List updatePartitionMetadata = new ArrayList<>(); +Set updatedPartitions = new HashSet<>(); +for (Entry partitionLeader: partitionLeaders.entrySet()) { +TopicPartition partition = (TopicPartition) partitionLeader.getKey(); +Metadata.LeaderAndEpoch currentLeader = currentLeader(partition); +Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) partitionLeader.getValue(); +if (!newLeader.epoch.isPresent() || !newLeader.leaderId.isPresent()) { +log.trace("For {}, incoming leader information is incomplete {}", partition, newLeader); +continue; +} +if (currentLeader.epoch.isPresent() && newLeader.epoch.get() <= currentLeader.epoch.get()) { +log.trace("For {}, incoming leader({}) is not-newer than the one in the existing metadata {}, so ignoring.", partition, newLeader, currentLeader); +continue; +} +if (!newNodes.containsKey(newLeader.leaderId.get())) { +log.trace("For {}, incoming leader({}), the corresponding node information for node-id {} is missing, so ignoring.", partition, newLeader, newLeader.leaderId.get()); +continue; +} +if (!this.cache.partitionMetadata(partition).isPresent()) { +log.trace("For {}, incoming leader({}), no longer has cached metadata so ignoring.", partition, newLeader); +continue; +} + +MetadataResponse.PartitionMetadata existingMetadata = this.cache.partitionMetadata(partition).get(); +MetadataResponse.PartitionMetadata updatedMetadata = new MetadataResponse.PartitionMetadata( +existingMetadata.error, +partition, +newLeader.leaderId, +newLeader.epoch, +existingMetadata.replicaIds, +existingMetadata.inSyncReplicaIds, +existingMetadata.offlineReplicaIds +); +updatePartitionMetadata.add(updatedMetadata); + +lastSeenLeaderEpochs.put(partition, newLeader.epoch.get()); + +updatedPartitions.add(partition); +} + +if (updatedPartitions.isEmpty()) { +log.debug("No relevant metadata updates."); +return updatedPartitions; +} + +// Get topic-ids for filtered partitions from existing topic-ids. +Map existingTopicIds = this.cache.topicIds(); +Map filteredTopicIds = updatePartitionMetadata.stream() +.filter(e -> existingTopicIds.containsKey(e.topic())) +.collect(Collectors.toMap(e -> e.topic(), e -> existingTopicIds.get(e.topic(; + +Set updatedTopics = updatePartitionMetadata.stream().map(MetadataResponse.PartitionMetadata::topic).collect(Collectors.toSet()); + +if (log.isTraceEnabled()) { +updatePartitionMetadata.forEach( +partMetadata -> log.trace("For {} updating to leader-id {}, leader-epoch {}.", partMetadata, partMetadata.leaderId.get(), partMetadata.leaderEpoch.get(
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1379231747 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT); +public static final String GROUP_PROTOCOL_DOC = "The group protocol consumer should use. We currently " + +"support \"generic\" or \"consumer\". If \"consumer\" is specified, then the consumer group protocol will be " + +"used. Otherwise, the generic group protocol will be used."; + +/** +* group.remote.assignor +*/ +public static final String REMOTE_ASSIGNOR_CONFIG = "group.remote.assignor"; +public static final String DEFAULT_REMOTE_ASSIGNOR = null; +public static final String REMOTE_ASSIGNOR_DOC = "The server-side assignor to use. If no assignor is specified, " + Review Comment: To make it more consistent: DEFAULT_GROUP_REMOTE_ASSIGNOR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]
cmccabe commented on PR #14682: URL: https://github.com/apache/kafka/pull/14682#issuecomment-1789520190 Thanks for the PR, @lzyLuke . Please remove these two lines from "The following features are not fully implemented in KRaft mode": > Modifying certain dynamic configurations on the standalone KRaft controller >Delegation tokens -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]
cmccabe commented on code in PR #14682: URL: https://github.com/apache/kafka/pull/14682#discussion_r1379209724 ## docs/ops.html: ## @@ -3613,23 +3613,22 @@ https://issues.apache.org/jira/projects/KAFKA"; target="_blank">project JIRA and the "kraft" component. - - Terminology -Brokers that are in ZK mode store their metadata in Apache ZooKeeper. This is the old mode of handling metadata. +Brokers that are in ZK mode store their metadata in Apache ZooKepeer. This is the old mode of handling metadata. Brokers that are in KRaft mode store their metadata in a KRaft quorum. This is the new and improved mode of handling metadata. Migration is the process of moving cluster metadata from ZooKeeper into a KRaft quorum. Migration Phases In general, the migration process passes through several phases. + + For -In the initial phase, all the brokers are in ZK mode, and there is a ZK-based controller. -During the initial metadata load, a KRaft quorum loads the metadata from ZooKeeper, -In hybrid phase, some brokers are in ZK mode, but there is a KRaft controller. -In dual-write phase, all brokers are KRaft, but the KRaft controller is continuing to write to ZK. -When the migration has been finalized, we no longer write metadata to ZooKeeper. +In the pre-migration phase, all the brokers are in ZK mode, and there is a ZK-based controller. But a KRaft controller has been provisioned and has migration enabled Review Comment: Change "controller" to "controller quorum" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]
cmccabe commented on code in PR #14682: URL: https://github.com/apache/kafka/pull/14682#discussion_r1379209314 ## docs/ops.html: ## @@ -3613,23 +3613,22 @@ https://issues.apache.org/jira/projects/KAFKA"; target="_blank">project JIRA and the "kraft" component. - - Terminology -Brokers that are in ZK mode store their metadata in Apache ZooKeeper. This is the old mode of handling metadata. +Brokers that are in ZK mode store their metadata in Apache ZooKepeer. This is the old mode of handling metadata. Review Comment: This adds a typo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
AndrewJSchofield commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1379170868 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT); +public static final String GROUP_PROTOCOL_DOC = "The group protocol consumer should use. We currently " + +"support \"generic\" or \"consumer\". If \"consumer\" is specified, then the consumer group protocol will be " + +"used. Otherwise, the generic group protocol will be used."; + +/** +* group.remote.assignor +*/ +public static final String REMOTE_ASSIGNOR_CONFIG = "group.remote.assignor"; Review Comment: `GROUP_REMOTE_ASSIGNOR_CONFIG` would be better. ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT); +public static final String GROUP_PROTOCOL_DOC = "The group protocol consumer should use. We currently " + +"support \"generic\" or \"consumer\". If \"consumer\" is specified, then the consumer group protocol will be " + +"used. Otherwise, the generic group protocol will be used."; + +/** +* group.remote.assignor +*/ +public static final String REMOTE_ASSIGNOR_CONFIG = "group.remote.assignor"; +public static final String DEFAULT_REMOTE_ASSIGNOR = null; +public static final String REMOTE_ASSIGNOR_DOC = "The server-side assignor to use. If no assignor is specified, " + Review Comment: `GROUP_REMOTE_ASSIGNOR_DOC` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]
lzyLuke commented on code in PR #14682: URL: https://github.com/apache/kafka/pull/14682#discussion_r1379152694 ## docs/ops.html: ## @@ -3787,7 +3786,7 @@ Migrating brokers to KRaft Reverting to ZooKeeper mode During the Migration While the cluster is still in migration mode, it is possible to revert to ZK mode. In order to do this: - One by one, take each KRaft broker down. Remove the __cluster_metadata directory on the broker. Then, restart the broker as ZooKeeper. + One by one, take each KRaft broker down. Remove the __cluster_metadata directory on the broker. Then, restart the broker as zookeeper mode. Review Comment: maybe `in` would be better :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org