[jira] [Updated] (KAFKA-15701) Allow use of user policy in CreateTopicPolicy
[ https://issues.apache.org/jira/browse/KAFKA-15701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiao Zhang updated KAFKA-15701: --- Description: One use case of CreateTopicPolicy we have experienced is allow/reject topic creation by checking the user. Especially for the secured cluster usage, we add acls to specific users for allowing topic creation. At the same time, we have the needs to design customized create topic policy for different users. For example, for user A, topic creation is allowed only when partition number is within limit. For user B, we allow topic creation without check. From the standing of kafka service provider, user A is imaged as random user of kafka service and user B is imaged as internal user for cluster management. For this need, we patched our local fork of kafka by passing user principle in KafkaApis. One place need to revise is here [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980] I think it's natural to also support this kind of usage in upstream. So I raised this Jira for asking community's ideas about this. was: One use case of CreateTopicPolicy we have experienced is allow/reject topic creation by checking the user. Especially for the secured cluster usage, we add acls to specific users for allowing topic creation. At the same time, we have the needs to design customized create topic policy for different users. For example, for user A, topic creation is allowed only when partition number is within limit. For user B, we allow topic creation without check. From the standing of kafka service provider, user A is imaged as random user of kafka service and user B is imaged as internal user for cluster management. For this need, we patched our local fork of kafka by passing user principle in KafkaApis. One place need to revise is here [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980] As thinking it's natural to support this kind of usage even in upstream, I raised this Jira for asking community's ideas about this. > Allow use of user policy in CreateTopicPolicy > -- > > Key: KAFKA-15701 > URL: https://issues.apache.org/jira/browse/KAFKA-15701 > Project: Kafka > Issue Type: Improvement >Reporter: Jiao Zhang >Priority: Minor > > One use case of CreateTopicPolicy we have experienced is allow/reject topic > creation by checking the user. > Especially for the secured cluster usage, we add acls to specific users for > allowing topic creation. At the same time, we have the needs to design > customized create topic policy for different users. For example, for user A, > topic creation is allowed only when partition number is within limit. For > user B, we allow topic creation without check. From the standing of kafka > service provider, user A is imaged as random user of kafka service and user B > is imaged as internal user for cluster management. > For this need, we patched our local fork of kafka by passing user principle > in KafkaApis. > One place need to revise is here > [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980] > I think it's natural to also support this kind of usage in upstream. So I > raised this Jira for asking community's ideas about this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15701) Allow use of user policy in CreateTopicPolicy
[ https://issues.apache.org/jira/browse/KAFKA-15701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiao Zhang updated KAFKA-15701: --- Description: One use case of CreateTopicPolicy we have experienced is allow/reject topic creation by checking the user. Especially for the secured cluster usage, we add acls to specific users for allowing topic creation. At the same time, we have the needs to design customized create topic policy for different users. For example, for user A, topic creation is allowed only when partition number is within limit. For user B, we allow topic creation without check. From the standing of kafka service provider, user A is imaged as random user of kafka service and user B is imaged as internal user for cluster management. For this need, we patched our local fork of kafka by passing user principle in KafkaApis. One place need to revise is here [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980] As thinking it's natural to support this kind of usage even in upstream, I raised this Jira for asking community's ideas about this. was: One use case of CreateTopicPolicy we have experienced is allow/reject topic creation by checking the user. Especially for the secured cluster usage, we add acls to specific users for allowing topic creation. At the same time, we have the needs to design customized create topic policy for different users. For example, for user A, topic creation is allowed only when partition number is within limit. For user B, we allow topic creation without check. As the kafka service provider, user A is imaged as random user of kafka service and user B is imaged as internal user for cluster management. For this need, we patched our local fork of kafka by passing user principle in KafkaApis. One place need to revise is here [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980] As thinking it's natural to support this kind of usage even in upstream, I raised this Jira for asking community's ideas about this. > Allow use of user policy in CreateTopicPolicy > -- > > Key: KAFKA-15701 > URL: https://issues.apache.org/jira/browse/KAFKA-15701 > Project: Kafka > Issue Type: Improvement >Reporter: Jiao Zhang >Priority: Minor > > One use case of CreateTopicPolicy we have experienced is allow/reject topic > creation by checking the user. > Especially for the secured cluster usage, we add acls to specific users for > allowing topic creation. At the same time, we have the needs to design > customized create topic policy for different users. For example, for user A, > topic creation is allowed only when partition number is within limit. For > user B, we allow topic creation without check. From the standing of kafka > service provider, user A is imaged as random user of kafka service and user B > is imaged as internal user for cluster management. > For this need, we patched our local fork of kafka by passing user principle > in KafkaApis. > One place need to revise is here > [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980] > As thinking it's natural to support this kind of usage even in upstream, I > raised this Jira for asking community's ideas about this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15701) Allow use of user policy in CreateTopicPolicy
[ https://issues.apache.org/jira/browse/KAFKA-15701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiao Zhang updated KAFKA-15701: --- Description: One use case of CreateTopicPolicy we have experienced is allow/reject topic creation by checking the user. Especially for the secured cluster usage, we add acls to specific users for allowing topic creation. At the same time, we have the needs to design customized create topic policy for different users. For example, for user A, topic creation is allowed when partition number is within limit. For user B, we allow topic creation without check. As the kafka service provider, user A is imaged as random user of kafka service and user B is imaged as internal user for cluster management. For this need, we patched our local fork of kafka by passing user principle in KafkaApis. One place need to revise is here [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980] As thinking it's natural to support this kind of usage even in upstream, I raised this Jira for asking community's ideas about this. was: One use case of CreateTopicPolicy we have experienced is allow/reject topic creation by checking the user . Especially for the secured cluster usage, we add acls to specific users for allowing topic creation. At the same time, we have the needs to design customized create topic policy for different users. For example, for user A, topic creation is allowed when partition number is within limit. For user B, we allow topic creation without check. As the kafka service provider, user A is imaged as random user of kafka service and user B is imaged as internal user for cluster management. For this need, we patched our local fork of kafka by passing user principle in KafkaApis. One place need to revise is here [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980] As thinking it's natural to support this kind of usage even in upstream, I raised this Jira for asking community's ideas about this. > Allow use of user policy in CreateTopicPolicy > -- > > Key: KAFKA-15701 > URL: https://issues.apache.org/jira/browse/KAFKA-15701 > Project: Kafka > Issue Type: Improvement >Reporter: Jiao Zhang >Priority: Minor > > One use case of CreateTopicPolicy we have experienced is allow/reject topic > creation by checking the user. > Especially for the secured cluster usage, we add acls to specific users for > allowing topic creation. At the same time, we have the needs to design > customized create topic policy for different users. For example, for user A, > topic creation is allowed when partition number is within limit. For user B, > we allow topic creation without check. As the kafka service provider, user A > is imaged as random user of kafka service and user B is imaged as internal > user for cluster management. > For this need, we patched our local fork of kafka by passing user principle > in KafkaApis. > One place need to revise is here > [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980] > As thinking it's natural to support this kind of usage even in upstream, I > raised this Jira for asking community's ideas about this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15701) Allow use of user policy in CreateTopicPolicy
[ https://issues.apache.org/jira/browse/KAFKA-15701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiao Zhang updated KAFKA-15701: --- Description: One use case of CreateTopicPolicy we have experienced is allow/reject topic creation by checking the user. Especially for the secured cluster usage, we add acls to specific users for allowing topic creation. At the same time, we have the needs to design customized create topic policy for different users. For example, for user A, topic creation is allowed only when partition number is within limit. For user B, we allow topic creation without check. As the kafka service provider, user A is imaged as random user of kafka service and user B is imaged as internal user for cluster management. For this need, we patched our local fork of kafka by passing user principle in KafkaApis. One place need to revise is here [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980] As thinking it's natural to support this kind of usage even in upstream, I raised this Jira for asking community's ideas about this. was: One use case of CreateTopicPolicy we have experienced is allow/reject topic creation by checking the user. Especially for the secured cluster usage, we add acls to specific users for allowing topic creation. At the same time, we have the needs to design customized create topic policy for different users. For example, for user A, topic creation is allowed when partition number is within limit. For user B, we allow topic creation without check. As the kafka service provider, user A is imaged as random user of kafka service and user B is imaged as internal user for cluster management. For this need, we patched our local fork of kafka by passing user principle in KafkaApis. One place need to revise is here [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980] As thinking it's natural to support this kind of usage even in upstream, I raised this Jira for asking community's ideas about this. > Allow use of user policy in CreateTopicPolicy > -- > > Key: KAFKA-15701 > URL: https://issues.apache.org/jira/browse/KAFKA-15701 > Project: Kafka > Issue Type: Improvement >Reporter: Jiao Zhang >Priority: Minor > > One use case of CreateTopicPolicy we have experienced is allow/reject topic > creation by checking the user. > Especially for the secured cluster usage, we add acls to specific users for > allowing topic creation. At the same time, we have the needs to design > customized create topic policy for different users. For example, for user A, > topic creation is allowed only when partition number is within limit. For > user B, we allow topic creation without check. As the kafka service provider, > user A is imaged as random user of kafka service and user B is imaged as > internal user for cluster management. > For this need, we patched our local fork of kafka by passing user principle > in KafkaApis. > One place need to revise is here > [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980] > As thinking it's natural to support this kind of usage even in upstream, I > raised this Jira for asking community's ideas about this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15701) Allow use of user policy in CreateTopicPolicy
Jiao Zhang created KAFKA-15701: -- Summary: Allow use of user policy in CreateTopicPolicy Key: KAFKA-15701 URL: https://issues.apache.org/jira/browse/KAFKA-15701 Project: Kafka Issue Type: Improvement Reporter: Jiao Zhang One use case of CreateTopicPolicy we have experienced is allow/reject topic creation by checking the user . Especially for the secured cluster usage, we add acls to specific users for allowing topic creation. At the same time, we have the needs to design customized create topic policy for different users. For example, for user A, topic creation is allowed when partition number is within limit. For user B, we allow topic creation without check. As the kafka service provider, user A is imaged as random user of kafka service and user B is imaged as internal user for cluster management. For this need, we patched our local fork of kafka by passing user principle in KafkaApis. One place need to revise is here [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980] As thinking it's natural to support this kind of usage even in upstream, I raised this Jira for asking community's ideas about this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
mjsax commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1374065471 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java: ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A LastValueTracker uses a ConcurrentMap to maintain historic values for a given key, and return Review Comment: For my own education: Why do we need a `ConcurrentMap`? ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java: ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A LastValueTracker uses a ConcurrentMap to maintain historic values for a given key, and return + * a previous value and an Instant for that value. + * + * @param The type of the value. + */ +public class LastValueTracker { +private final ConcurrentMap>> counters = new ConcurrentHashMap<>(); + +/** + * Return the last instant/value for the given MetricKey, or Optional.empty if there isn't one. + * + * @param metricKey the key for which to calculate a getAndSet. + * @param now the timestamp for the new value. + * @param value the current value. + * @return the timestamp of the previous entry and its value. If there + * isn't a previous entry, then this method returns {@link Optional#empty()} + */ +public Optional> getAndSet(MetricKey metricKey, Instant now, T value) { +InstantAndValue instantAndValue = new InstantAndValue<>(now, value); +AtomicReference> valueOrNull = counters Review Comment: Not sure why we need to wrap `instantAndValue` in an `AtomicReference`. Can you elaborate? ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java: ## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import
[jira] [Commented] (KAFKA-15695) Local log start offset is not updated on the follower after rebuilding remote log auxiliary state
[ https://issues.apache.org/jira/browse/KAFKA-15695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780166#comment-17780166 ] Kamal Chandraprakash commented on KAFKA-15695: -- [~nikramakrishnan] I remember fixing this issue in [#14328|https://github.com/apache/kafka/pull/14328] and covered it via integration test. Can you come up with a test to reproduce this issue? Thanks! > Local log start offset is not updated on the follower after rebuilding remote > log auxiliary state > - > > Key: KAFKA-15695 > URL: https://issues.apache.org/jira/browse/KAFKA-15695 > Project: Kafka > Issue Type: Bug > Components: replication, Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Nikhil Ramakrishnan >Assignee: Nikhil Ramakrishnan >Priority: Major > Labels: KIP-405, tiered-storage > Fix For: 3.7.0 > > > In 3.6, the local log start offset is not updated when reconstructing the > auxiliary state of the remote log on a follower. > The impact of this bug is significant because, if this follower becomes the > leader before the local log start offset has had a change to be updated, > reads from any offset between [wrong log start offset; actual log start > offset] will be routed on the local storage, which does not contain the > corresponding data. Consumer reads will in this case never be satisfied. > > Reproduction case: > # Create a cluster with 2 brokers, broker 0 and broker 1. > # Create a topic topicA with RF=2, 1 partition (topicA-0) and 2 batches per > segment, with broker 0 as the leader. > # Stop broker 1, and produce 3 records to topicA, such that segment 1 with > the first two records are copied to remote and deleted from local storage. > # Start broker 1, let it catch up with broker 0. > # Stop broker 0 such that broker 1 is elected as the leader, and try to > consume from the beginning of topicA-0. > This consumer read will not be satisfied because the local log start offset > is not updated on broker 1 when it builds the auxiliary state of the remote > log segments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
philipnee commented on PR #14639: URL: https://github.com/apache/kafka/pull/14639#issuecomment-1782275615 Hi @lucasbru - Thanks for taking the time to review my PR. I addressed all but 2 comments: 1. Mockito: Could you be more specific on how you expect to mock the response object? 2. Error handling: Essentially all errors in `continueHandlePartitionErrors` can only happen in the response. I understand there are some redundancy there and can be a bit confusing. But the response and both throw a hard failures (response = null and throwable = non-null) or a server side error (response = non-null). That is why it was kept separated. If you find it unclear - how do I make it more readable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
philipnee commented on code in PR #14639: URL: https://github.com/apache/kafka/pull/14639#discussion_r1374057359 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -127,7 +146,7 @@ public void maybeAutoCommit(final Map offsets * Handles {@link org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It creates an * {@link OffsetCommitRequestState} and enqueue it to send later. */ -public CompletableFuture addOffsetCommitRequest(final Map offsets) { +public OffsetCommitRequestState addOffsetCommitRequest(final Map offsets) { Review Comment: Got it, I think the main reason was to test retryBackoff. I made some changes according to your suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1374013380 ## streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java: ## @@ -34,6 +34,9 @@ public abstract class WrappedStateStore implements StateStore, CachedStateStore { public static boolean isTimestamped(final StateStore stateStore) { +if (stateStore instanceof KeyValueToTimestampedKeyValueByteStoreAdapter) { Review Comment: got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]
mjsax commented on code in PR #14619: URL: https://github.com/apache/kafka/pull/14619#discussion_r1374019186 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * This class encapsulates naming and mapping conventions defined as part of + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics naming and format + */ +public class MetricNamingConvention { + +private static final String NAME_JOINER = "."; +private static final String TAG_JOINER = "_"; + +// remove metrics as it is redundant for telemetry metrics naming convention +private final static Pattern GROUP_PATTERN = Pattern.compile("\\.(metrics)"); + +public static MetricNamingStrategy getClientTelemetryMetricNamingStrategy(String domain) { +return new MetricNamingStrategy() { +@Override +public MetricKey metricKey(MetricName metricName) { +Objects.requireNonNull(metricName, "metric name cannot be null"); +String group = metricName.group() == null ? "" : metricName.group(); +String rawName = metricName.name() == null ? "" : metricName.name(); + +return new MetricKey(fullMetricName(domain, group, rawName), +Collections.unmodifiableMap(cleanTags(metricName.tags(; +} + +@Override +public MetricKey derivedMetricKey(MetricKey key, String derivedComponent) { +Objects.requireNonNull(derivedComponent, "derived component cannot be null"); +return new MetricKey(key.getName() + NAME_JOINER + derivedComponent, key.tags()); +} +}; +} + +/** + * Creates a metric name given the domain, group, and name. The new String follows the following + * conventions and rules: + * + * + * domain is expected to be a host-name like value, e.g. {@code org.apache.kafka} + * group is cleaned of redundant words: "-metrics" + * the group and metric name is dot separated + * The name is created by joining the three components, e.g.: + * {@code org.apache.kafka.producer.connection.creation.rate} + * + */ +private static String fullMetricName(String domain, String group, String name) { +return domain ++ NAME_JOINER ++ cleanGroup(group) ++ NAME_JOINER ++ cleanMetric(name); +} + +/** + * This method maps a raw name to follow conventions and cleans up the result to be more legible: + * + * converts names to lower hyphen case conventions + * strips redundant parts of the metric name, such as -metrics + * normalizes artifacts of hyphen case to dot separated conversion + * + */ +private static String cleanGroup(String group) { +group = clean(group, NAME_JOINER); +return GROUP_PATTERN.matcher(group).replaceAll(""); +} + +private static String cleanMetric(String metric) { +return clean(metric, NAME_JOINER); +} + +/** + * Converts a tag name to match the telemetry naming conventions by converting snake_case. + * + * Kafka metrics have tags name in lower case separated by hyphens. Eg: total-errors + * + * @param raw the input map + * @return the new map with keys replaced by snake_case representations. + */ +private static Map cleanTags(Map raw) { +return raw.entrySet() +.stream() +.collect(Collectors.toMap(s -> clean(s.getKey(), TAG_JOINER), Entry::getValue)); +} + +private static String clean(String raw, String joiner) { +Objects.requireNonNull(raw, "metric
[jira] [Resolved] (KAFKA-15390) FetchResponse.preferredReplica may contains fenced replica in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-15390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deng Ziming resolved KAFKA-15390. - Fix Version/s: 3.6.0 Resolution: Fixed > FetchResponse.preferredReplica may contains fenced replica in KRaft mode > > > Key: KAFKA-15390 > URL: https://issues.apache.org/jira/browse/KAFKA-15390 > Project: Kafka > Issue Type: Bug >Reporter: Deng Ziming >Assignee: Deng Ziming >Priority: Major > Fix For: 3.6.0 > > > `KRaftMetadataCache.getPartitionReplicaEndpoints` will return a fenced broker > id. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
philipnee commented on code in PR #14639: URL: https://github.com/apache/kafka/pull/14639#discussion_r1374040965 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -382,9 +522,49 @@ private ClientResponse buildOffsetFetchClientResponse( return buildOffsetFetchClientResponse(request, topicPartitionData, error); } +private ClientResponse buildOffsetCommitClientResponse(final OffsetCommitResponse commitResponse, + final Errors error) { +OffsetCommitResponseData data = new OffsetCommitResponseData(); +OffsetCommitResponse response = new OffsetCommitResponse(data); +short apiVersion = 1; +return new ClientResponse( +new RequestHeader(ApiKeys.OFFSET_COMMIT, apiVersion, "", 1), +null, +"-1", +time.milliseconds(), +time.milliseconds(), +false, +null, +null, +commitResponse +); +} + +public ClientResponse mockOffsetCommitResponse(String topic, int partition, short apiKeyVersion, Errors error) { Review Comment: Could you be specific: Do you mean by doing something like this? ``` response = mock(ClientResponse.class); when(response.receivedTimeMs()).thenReturn(...); when(response.data()).thenReturn(responseData); return response; ``` ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -382,9 +522,49 @@ private ClientResponse buildOffsetFetchClientResponse( return buildOffsetFetchClientResponse(request, topicPartitionData, error); } +private ClientResponse buildOffsetCommitClientResponse(final OffsetCommitResponse commitResponse, + final Errors error) { +OffsetCommitResponseData data = new OffsetCommitResponseData(); +OffsetCommitResponse response = new OffsetCommitResponse(data); +short apiVersion = 1; +return new ClientResponse( +new RequestHeader(ApiKeys.OFFSET_COMMIT, apiVersion, "", 1), +null, +"-1", +time.milliseconds(), +time.milliseconds(), +false, +null, +null, +commitResponse +); +} + +public ClientResponse mockOffsetCommitResponse(String topic, int partition, short apiKeyVersion, Errors error) { Review Comment: Could you be specific: Do you mean by doing something like this? ``` response = mock(ClientResponse.class); when(response.receivedTimeMs()).thenReturn(...); when(response.data()).thenReturn(responseData); return response; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]
philipnee commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1374037210 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * Value object that contains the name and tags for a Metric. + */ +public class MetricKey implements MetricKeyable { + +private final String name; +private final Map tags; + +/** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the telemetry metric name of the metric (the final name + * under which this metric is emitted). + */ +public MetricKey(String name) { +this(name, null); +} + +/** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the .converted. name of the metric (the final name + * under which this metric is emitted). + * @param tags mapping of tag keys to values. + */ +public MetricKey(String name, Map tags) { +this.name = Objects.requireNonNull(name); +this.tags = tags != null ? Collections.unmodifiableMap(tags) : Collections.emptyMap(); +} + +public MetricKey(MetricName metricName) { +this(metricName.name(), metricName.tags()); +} + +@Override +public MetricKey key() { +return this; +} + +public String getName() { Review Comment: I was told the same - but it doesn't seem to be enforced everywhere. Call for Apache Kafka style guide. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on PR #14642: URL: https://github.com/apache/kafka/pull/14642#issuecomment-1782236753 Hi @kirktrue Thanks for taking time reviewing my code. I made changes according to your suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1374036176 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = "generic"; +public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol consumer should use. We currently " + +"support GENERIC or CONSUMER. If CONSUMER is specified, then the consumer group protocol will be used. " + +"Otherwise, the generic group protocol will be used."; + +/** +* group.remote.assignor +*/ +public static final String REMOTE_ASSIGNOR_CONFIG = "group.remote.assignor"; +public static final String DEFAULT_REMOTE_ASSIGNOR = null; +public static final String REMOTE_ASSIGNOR_DOC = "The server side assignor to use. It cannot be used in " + +"conjunction with group.local.assignor. The group coordinator will choose the assignor if no " + Review Comment: Good call. Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1374035895 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_ALLOW_AUTO_CREATE_TOPICS, Importance.MEDIUM, ALLOW_AUTO_CREATE_TOPICS_DOC) +.define(GROUP_PROTOCOL_CONFIG, +Type.STRING, +DEFAULT_GROUP_PROTOCOL, +ConfigDef.CaseInsensitiveValidString + .in(Utils.enumOptions(GroupProtocol.class)), Review Comment: 108 char width! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1374035641 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_ALLOW_AUTO_CREATE_TOPICS, Importance.MEDIUM, ALLOW_AUTO_CREATE_TOPICS_DOC) +.define(GROUP_PROTOCOL_CONFIG, +Type.STRING, +DEFAULT_GROUP_PROTOCOL, +ConfigDef.CaseInsensitiveValidString + .in(Utils.enumOptions(GroupProtocol.class)), +Importance.HIGH, +GROUP_PROTOCOL_DOC) +.define(REMOTE_ASSIGNOR_CONFIG, +Type.STRING, +DEFAULT_REMOTE_ASSIGNOR, +Importance.MEDIUM, +REMOTE_ASSIGNOR_DOC) Review Comment: You are right! I kept it that way because I was trying to avoid editing a few hundred lines of code! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15445: [WIP] Add JVM Docker image [kafka]
sanjay-awatramani commented on code in PR #14552: URL: https://github.com/apache/kafka/pull/14552#discussion_r1374022606 ## docker/docker_release.py: ## @@ -0,0 +1,80 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Python script to build and push docker image +Usage: docker_release.py + +Interactive utility to push the docker image to dockerhub +""" + +import subprocess +from distutils.dir_util import copy_tree +from datetime import date +import shutil + +def push_jvm(image, kafka_url): +copy_tree("resources", "jvm/resources") +subprocess.run(["docker", "buildx", "build", "-f", "jvm/Dockerfile", "--build-arg", f"kafka_url={kafka_url}", "--build-arg", f"build_date={date.today()}", +"--push", +"--platform", "linux/amd64,linux/arm64", +"--tag", image, "jvm"]) +shutil.rmtree("jvm/resources") + +def login(): +status = subprocess.run(["docker", "login"]) +if status.returncode != 0: +print("Docker login failed, aborting the docker release") +raise PermissionError + +def create_builder(): +subprocess.run(["docker", "buildx", "create", "--name", "kafka-builder", "--use"]) + +def remove_builder(): +subprocess.run(["docker", "buildx", "rm", "kafka-builder"]) + +if __name__ == "__main__": Review Comment: This is an interactive script. How would we support automation of release process via GH actions or Jenkins? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]
ocadaruma commented on code in PR #14242: URL: https://github.com/apache/kafka/pull/14242#discussion_r1374014105 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -462,22 +462,32 @@ public Optional lastEntry(long producerId) { } /** - * Take a snapshot at the current end offset if one does not already exist. + * Take a snapshot at the current end offset if one does not already exist with syncing the change to the device */ public void takeSnapshot() throws IOException { +takeSnapshot(true); +} + +/** + * Take a snapshot at the current end offset if one does not already exist, then return the snapshot file if taken. + */ +public Optional takeSnapshot(boolean sync) throws IOException { Review Comment: Thanks, that's a good point. I overlooked snapshot files would be cleaned up upon receiving `OffsetMovedToRemoteStorage`. In this case, if async flush is performed against non-existent file, it would throw IOException so we should catch it and ignore if it's NoSuchFileException. (Since file creation is still done in original thread so shouldn't conflict with truncateFullyAndReloadSnapshots. Only fsync is moved to async thread) I'll fix that ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -462,22 +462,32 @@ public Optional lastEntry(long producerId) { } /** - * Take a snapshot at the current end offset if one does not already exist. + * Take a snapshot at the current end offset if one does not already exist with syncing the change to the device */ public void takeSnapshot() throws IOException { +takeSnapshot(true); +} + +/** + * Take a snapshot at the current end offset if one does not already exist, then return the snapshot file if taken. + */ +public Optional takeSnapshot(boolean sync) throws IOException { Review Comment: Thanks, that's a good point. I overlooked snapshot files would be cleaned up upon receiving `OffsetMovedToRemoteStorage`. In this case, if async flush is performed against non-existent file, it would throw IOException so we should catch it and ignore if it's NoSuchFileException. (Since file creation is still done in original thread so shouldn't conflict with `truncateFullyAndReloadSnapshots`. Only fsync is moved to async thread) I'll fix that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]
mjsax commented on code in PR #14618: URL: https://github.com/apache/kafka/pull/14618#discussion_r1374016565 ## build.gradle: ## @@ -295,7 +296,12 @@ subprojects { } publications { mavenJava(MavenPublication) { - from components.java + if (!shouldPublishWithShadow) { +from components.java + } else { +apply plugin: 'com.github.johnrengelman.shadow' Review Comment: What is this plugin? Does not look like something official? Not sure if we should use something that is no backed by some official OSS project? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]
mjsax commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1374007941 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * Value object that contains the name and tags for a Metric. + */ +public class MetricKey implements MetricKeyable { + +private final String name; +private final Map tags; + +/** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the telemetry metric name of the metric (the final name + * under which this metric is emitted). + */ +public MetricKey(String name) { +this(name, null); +} + +/** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the .converted. name of the metric (the final name + * under which this metric is emitted). + * @param tags mapping of tag keys to values. + */ +public MetricKey(String name, Map tags) { +this.name = Objects.requireNonNull(name); +this.tags = tags != null ? Collections.unmodifiableMap(tags) : Collections.emptyMap(); +} + +public MetricKey(MetricName metricName) { +this(metricName.name(), metricName.tags()); +} + +@Override +public MetricKey key() { +return this; +} + +public String getName() { Review Comment: nit: we don't use `get` prefix as a convention; should be rename to `name()` ## clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryPayload.java: ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.telemetry; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.nio.ByteBuffer; + +@InterfaceStability.Evolving +public interface ClientTelemetryPayload { Review Comment: Should we add a class level JavaDoc describing it? It's a public interface, so seems important? ## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetry.java: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.annotation.InterfaceStability; + +import
Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]
ocadaruma commented on code in PR #14242: URL: https://github.com/apache/kafka/pull/14242#discussion_r1374014105 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -462,22 +462,32 @@ public Optional lastEntry(long producerId) { } /** - * Take a snapshot at the current end offset if one does not already exist. + * Take a snapshot at the current end offset if one does not already exist with syncing the change to the device */ public void takeSnapshot() throws IOException { +takeSnapshot(true); +} + +/** + * Take a snapshot at the current end offset if one does not already exist, then return the snapshot file if taken. + */ +public Optional takeSnapshot(boolean sync) throws IOException { Review Comment: Thanks, that's a good point. I overlooked snapshot files would be cleaned up upon receiving `OffsetMovedToRemoteStorage`. In this case, if async flush is performed against non-existent file, it would throw IOException so we should catch it and ignore if it's NoSuchFileException. (Since file creation is still done in original thread. Only fsync is moved to async thread) I'll fix that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1374013380 ## streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java: ## @@ -34,6 +34,9 @@ public abstract class WrappedStateStore implements StateStore, CachedStateStore { public static boolean isTimestamped(final StateStore stateStore) { +if (stateStore instanceof KeyValueToTimestampedKeyValueByteStoreAdapter) { Review Comment: but now` TimestampedBytesStore` like this: ``` public interface TimestampedBytesStore { static byte[] convertToTimestampedFormat(final byte[] plainValue) { if (plainValue == null) { return null; } return ByteBuffer .allocate(8 + plainValue.length) .putLong(NO_TIMESTAMP) .put(plainValue) .array(); } } ``` you mean we implement a method return true in this interface? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15201: Allow git push to fail gracefully [kafka]
Owen-CH-Leung commented on code in PR #14645: URL: https://github.com/apache/kafka/pull/14645#discussion_r1374005528 ## release.py: ## @@ -730,7 +730,7 @@ def select_gpg_key(): fail("Ok, giving up") if not user_ok("Ok to push RC tag %s (y/n)?: " % rc_tag): fail("Ok, giving up") -cmd("Pushing RC tag", "git push %s %s" % (PUSH_REMOTE_NAME, rc_tag)) +cmd("Pushing RC tag", "git push %s %s" % (PUSH_REMOTE_NAME, rc_tag), num_retries=0) Review Comment: if we put `allow_failure=True`, when `git push` failed, the program will continue to run the remaining lines after `git push` instead of failing gracefully. So I think we should just keep `num_retries=0` so that it triggered the `fail` function and proceed to do the cleanup -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15605: Fix topic deletion handling during ZK migration [kafka]
yyu1993 commented on code in PR #14545: URL: https://github.com/apache/kafka/pull/14545#discussion_r1373996821 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1216,15 +1228,21 @@ class LogManager(logDirs: Seq[File], cleaner.updateCheckpoints(removedLog.parentDirFile, partitionToRemove = Option(topicPartition)) } } -removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), false) +if (isStray) { Review Comment: Got it. Makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15689: Logging skipped event when expected migration state is wrong [kafka]
showuon commented on PR #14646: URL: https://github.com/apache/kafka/pull/14646#issuecomment-1782174276 @ppatierno , please resolve the conflicts. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434
[ https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15319: Fix Version/s: 3.5.2 > Upgrade rocksdb to fix CVE-2022-37434 > - > > Key: KAFKA-15319 > URL: https://issues.apache.org/jira/browse/KAFKA-15319 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.1 >Reporter: Maruthi >Assignee: Lucas Brutschy >Priority: Critical > Fix For: 3.6.0, 3.5.2 > > Attachments: compat_report.html.zip > > > Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12 > Upgrade to 1.2.13 to fix > https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15319: Upgrade rocksdb to fix CVE-2022-37434 [kafka]
mjsax commented on PR #14216: URL: https://github.com/apache/kafka/pull/14216#issuecomment-1782162923 Cherry-picked this to `3.5` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) [kafka]
mjsax commented on code in PR #14596: URL: https://github.com/apache/kafka/pull/14596#discussion_r1373965049 ## streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java: ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a single record from a versioned state store based on its key and timestamp. Review Comment: We should add `@param ` and `@parm ` to describe the generic types ## streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java: ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a single record from a versioned state store based on its key and timestamp. + */ +@Evolving +public final class VersionedKeyQuery implements Query> { + +private final K key; +private final Optional asOfTimestamp; + +private VersionedKeyQuery(final K key, final Optional asOfTimestamp) { +this.key = Objects.requireNonNull(key); +this.asOfTimestamp = asOfTimestamp; +} + +/** + * Creates a query that will retrieve the record from a versioned state store identified by {@code key} if it exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @param The type of the key + * @param The type of the value that will be retrieved + * @throws NullPointerException if @param key is null + */ +public static VersionedKeyQuery withKey(final K key) { +return new VersionedKeyQuery<>(key, Optional.empty()); +} + +/** + * Specifies the timestamp for the key query. The key query returns the record version for the specified timestamp. Review Comment: `record's` ? (not sure) ## streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java: ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
mjsax commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1373981478 ## streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java: ## @@ -34,6 +34,9 @@ public abstract class WrappedStateStore implements StateStore, CachedStateStore { public static boolean isTimestamped(final StateStore stateStore) { +if (stateStore instanceof KeyValueToTimestampedKeyValueByteStoreAdapter) { Review Comment: We should not add this case, but rather let `KeyValueToTimestampedKeyValueByteStoreAdapter` implement the marker interface `TimestampedBytesStore` such that the existing condition returns `true` for this store. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1373977415 ## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java: ## @@ -346,6 +346,23 @@ public static Function getDeserializeValue(final StateSerdes deserializer.deserialize(serdes.topic(), byteArray); } +@SuppressWarnings({"unchecked", "rawtypes"}) +public static Function getDeserializeValue2(final StateSerdes serdes, + final StateStore wrapped, + final boolean isDSLStore ) { +final Serde valueSerde = serdes.valueSerde(); +final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) || isDSLStore; +final Deserializer deserializer; +if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) { Review Comment: This just solve the KeyQuery temporary, now I am debugging at RangeQuery, we should transfer the `RocksDBRangeIterator `to `KeyValueToTimestampedKeyValueIteratorAdapter` in `KeyValueToTimestampedKeyValueByteStoreAdapter`. Because we want to transfer kv iterator to ts kv iterator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1373970782 ## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java: ## @@ -346,6 +346,23 @@ public static Function getDeserializeValue(final StateSerdes deserializer.deserialize(serdes.topic(), byteArray); } +@SuppressWarnings({"unchecked", "rawtypes"}) +public static Function getDeserializeValue2(final StateSerdes serdes, + final StateStore wrapped, + final boolean isDSLStore ) { +final Serde valueSerde = serdes.valueSerde(); +final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) || isDSLStore; +final Deserializer deserializer; +if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) { Review Comment: Yes, I find that bug, that bug in `WrappedStateStore`, we should add a statement like this ``` public static boolean isTimestamped(final StateStore stateStore) { if (stateStore instanceof KeyValueToTimestampedKeyValueByteStoreAdapter) { return true; } ``` But this just solve the ROCK_KV DSL store with cache = true. Because we the cache = true, we get the data from `CachingKeyValueStore ` but not `KeyValueToTimestampedKeyValueByteStoreAdapter` But we have to force` timestamped` to be true in the `getDeserializeValue`, because like `CachingKeyValueStore ` it's not ts-kv store, but ROCK_KV DSL get data from this store. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1373970782 ## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java: ## @@ -346,6 +346,23 @@ public static Function getDeserializeValue(final StateSerdes deserializer.deserialize(serdes.topic(), byteArray); } +@SuppressWarnings({"unchecked", "rawtypes"}) +public static Function getDeserializeValue2(final StateSerdes serdes, + final StateStore wrapped, + final boolean isDSLStore ) { +final Serde valueSerde = serdes.valueSerde(); +final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) || isDSLStore; +final Deserializer deserializer; +if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) { Review Comment: Yes, I find that bug, that bug in `WrappedStateStore`, we should add a statement like this ``` public static boolean isTimestamped(final StateStore stateStore) { if (stateStore instanceof KeyValueToTimestampedKeyValueByteStoreAdapter) { return true; } if (stateStore instanceof TimestampedBytesStore) { return true; } else if (stateStore instanceof WrappedStateStore) { ``` But this just solve the ROCK_KV DSL store with cache = true. We have to force` timestamped` to be true. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15527: Update Javadoc for range and reverseRange in ReadOnlyKeyValueStore and update upgrade-guide.html for kafka streams webpage [kafka]
mjsax merged PR #14600: URL: https://github.com/apache/kafka/pull/14600 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
mjsax commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1373953314 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ## @@ -126,15 +128,22 @@ public QueryResult query( final PositionBound positionBound, final QueryConfig config) { - final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; -final QueryResult result = store.query(query, positionBound, config); +QueryResult result = store.query(query, positionBound, config); +Position position = result.getPosition(); +if (result.isSuccess()) { +byte[] res = (byte[]) result.getResult(); +byte[] res1 = convertToTimestampedFormat(res); +result = (QueryResult) QueryResult.forResult(res1); +} + if (config.isCollectExecutionInfo()) { final long end = System.nanoTime(); result.addExecutionInfo( "Handled in " + getClass() + " in " + (end - start) + "ns" ); } +result.setPosition(position); Review Comment: Why this change? ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ## @@ -126,15 +128,22 @@ public QueryResult query( final PositionBound positionBound, final QueryConfig config) { - final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; -final QueryResult result = store.query(query, positionBound, config); +QueryResult result = store.query(query, positionBound, config); +Position position = result.getPosition(); +if (result.isSuccess()) { +byte[] res = (byte[]) result.getResult(); +byte[] res1 = convertToTimestampedFormat(res); +result = (QueryResult) QueryResult.forResult(res1); Review Comment: We need to set the position on the newly created result. We should also copy the "execution information" (also wondering if we should add a new entry to it?) \cc @aliehsaeedii WDYT? Maybe we can actually re-use `InternalQueryResultUtil.copyAndSubstituteDeserializedResult` ? ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ## @@ -126,15 +128,22 @@ public QueryResult query( final PositionBound positionBound, final QueryConfig config) { - final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; -final QueryResult result = store.query(query, positionBound, config); +QueryResult result = store.query(query, positionBound, config); +Position position = result.getPosition(); +if (result.isSuccess()) { +byte[] res = (byte[]) result.getResult(); +byte[] res1 = convertToTimestampedFormat(res); Review Comment: Use better variable names. Eg `res` -> `plainValue` and `res2` -> `valueWithTimestamp` ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java: ## @@ -102,4 +140,230 @@ static class RawAndDeserializedValue { this.value = value; } } + +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final QueryConfig config) { + +final long start = time.nanoseconds(); +final QueryResult result; + +final StoreQueryUtils.QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, config); +if (config.isCollectExecutionInfo()) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +config, +this +); +if (config.isCollectExecutionInfo()) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (time.nanoseconds() - start) + "ns"); +} +} +return result; +} + + + +@SuppressWarnings("unchecked") +protected QueryResult runTimestampKeyQuery(final Query query, + final PositionBound positionBound, + final QueryConfig config) { +final QueryResult result; +final TimestampedKeyQuery typedKeyQuery =
[jira] [Updated] (KAFKA-15690) EosIntegrationTest is flaky.
[ https://issues.apache.org/jira/browse/KAFKA-15690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15690: Component/s: streams unit tests > EosIntegrationTest is flaky. > > > Key: KAFKA-15690 > URL: https://issues.apache.org/jira/browse/KAFKA-15690 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Calvin Liu >Priority: Major > > EosIntegrationTest > shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2, > processing threads = false] > {code:java} > org.junit.runners.model.TestTimedOutException: test timed out after 600 > seconds at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleServerDisconnect(NetworkClient.java:) > at > org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:821) > at > org.apache.kafka.clients.NetworkClient.processTimeoutDisconnection(NetworkClient.java:779) >at > org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:837) > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_1, processor=KSTREAM-SOURCE-00, > topic=multiPartitionInputTopic, partition=1, offset=15, > stacktrace=java.lang.RuntimeException: Detected we've been interrupted. > at > org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892) >at > org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867) >at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49) > at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38) > at > org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66) > {code} > shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2, processing > threads = false] > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. at > org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:204) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:286) >at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:274) >at > org.apache.kafka.streams.integration.EosIntegrationTest.createTopics(EosIntegrationTest.java:174) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_1, processor=KSTREAM-SOURCE-00, > topic=multiPartitionInputTopic, partition=1, offset=15, > stacktrace=java.lang.RuntimeException: Detected we've been interrupted. > at > org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892) >at > org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867) >at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49) > at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38) > at > org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66) > {code} > shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once, processing > threads = false] > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 6. > StreamsTasks did not request commit. ==> expected: but was: >at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) >at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) >at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) > java.lang.IllegalStateException: Replica > [Topic=__transaction_state,Partition=2,Replica=1] should be in the > OfflineReplica,ReplicaDeletionStarted states before moving to > ReplicaDeletionIneligible state. Instead it is in OnlineReplica state > at >
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
kirktrue commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1373924481 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = "generic"; +public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol consumer should use. We currently " + +"support GENERIC or CONSUMER. If CONSUMER is specified, then the consumer group protocol will be used. " + Review Comment: I think we should use the lower-case versions of these strings: ```suggestion "support \"generic\" or \"consumer\". If \"consumer\" is specified, then the consumer group protocol will be used. " + ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = "generic"; +public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol consumer should use. We currently " + +"support GENERIC or CONSUMER. If CONSUMER is specified, then the consumer group protocol will be used. " + +"Otherwise, the generic group protocol will be used."; + +/** +* group.remote.assignor +*/ +public static final String REMOTE_ASSIGNOR_CONFIG = "group.remote.assignor"; +public static final String DEFAULT_REMOTE_ASSIGNOR = null; +public static final String REMOTE_ASSIGNOR_DOC = "The server side assignor to use. It cannot be used in " + +"conjunction with group.local.assignor. The group coordinator will choose the assignor if no " + Review Comment: Two questions: 1. Is `group.local.assignor` the same as `group.local.assignors`? 2. If `group.local.assignors` isn't in this PR, should we just omit that sentence? ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_ALLOW_AUTO_CREATE_TOPICS, Importance.MEDIUM, ALLOW_AUTO_CREATE_TOPICS_DOC) +.define(GROUP_PROTOCOL_CONFIG, +Type.STRING, +DEFAULT_GROUP_PROTOCOL, +ConfigDef.CaseInsensitiveValidString + .in(Utils.enumOptions(GroupProtocol.class)), Review Comment: You need a wider monitor, @philipnee ```suggestion ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(GroupProtocol.class)), ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_ALLOW_AUTO_CREATE_TOPICS, Importance.MEDIUM, ALLOW_AUTO_CREATE_TOPICS_DOC) +.define(GROUP_PROTOCOL_CONFIG, +Type.STRING, +DEFAULT_GROUP_PROTOCOL, +ConfigDef.CaseInsensitiveValidString + .in(Utils.enumOptions(GroupProtocol.class)), Review Comment: You need a wider monitor, @philipnee ```suggestion ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(GroupProtocol.class)), ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String
[PR] [KIP-954] support custom DSL store providers [kafka]
agavra opened a new pull request, #14648: URL: https://github.com/apache/kafka/pull/14648 ### Overview Implementation for [KIP-954](https://cwiki.apache.org/confluence/display/KAFKA/KIP-954%3A+expand+default+DSL+store+configuration+to+custom+types) ### Testing Strategy * Updated the topology tests to ensure that the configuration is picked up in the topology builder * Manually built a Kafka Streams application using a customer `DslStoreSuppliers` class and verified that it was used ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 15680 [kafka]
C0urante commented on PR #14630: URL: https://github.com/apache/kafka/pull/14630#issuecomment-1782075383 I can take a look next week. In the meantime, do you think you could add some unit tests for this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15661: KIP-951: protocol changes [kafka]
jolshan commented on code in PR #14627: URL: https://github.com/apache/kafka/pull/14627#discussion_r1373924152 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -360,7 +360,9 @@ public short partitionRecordVersion() { } public short fetchRequestVersion() { -if (this.isAtLeast(IBP_3_5_IV1)) { +if (this.isAtLeast(IBP_3_7_IV0)) { Review Comment: I just realized that if we set the version as unstable, we may not be able to use it here. 臘♀️ Maybe we should remove the unstable version true if this causes issues in tests. Sorry for confusion. ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -360,7 +360,9 @@ public short partitionRecordVersion() { } public short fetchRequestVersion() { -if (this.isAtLeast(IBP_3_5_IV1)) { +if (this.isAtLeast(IBP_3_7_IV0)) { Review Comment: I just realized that if we set the version as unstable, we may not be able to use it here. 臘♀️ Maybe we should remove the unstable version true if this causes issues in tests. Sorry for confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14419) Failed SyncGroup leading to partitions lost due to processing during rebalances
[ https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-14419: --- Summary: Failed SyncGroup leading to partitions lost due to processing during rebalances (was: Same message consumed again by the same stream task after partition is lost and reassigned) > Failed SyncGroup leading to partitions lost due to processing during > rebalances > --- > > Key: KAFKA-14419 > URL: https://issues.apache.org/jira/browse/KAFKA-14419 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Four Kafka client application instances on separate EC2 instances with a > total of 8 active and 8 standby stream tasks for the same stream topology, > consuming from an input topic with 8 partitions. Sometimes a handful of > messages are consumed twice by one of the stream tasks when stream tasks on > another application instance join the consumer group after an application > instance restart. > Additional information: > Messages are produced to the topic by another Kafka streams topology deployed > on the same four application instances. I have verified that each message is > only produced once by enabling debug logging in the topology flow right > before producing each message to the topic. > Logs from stream thread with duplicate consumption: > > {code:java} > 2022-11-21 15:09:33,677 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is > already rebalancing > 2022-11-21 15:09:33,677 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] (Re-)joining group > Input records consumed for the first time > 2022-11-21 15:09:33,919 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Successfully joined group with > generation Generation{generationId=8017, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:33,920 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began > another rebalance. Need to re-join the group. Sent generation was > Generation{generationId=8017, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:33,922 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: > encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response > 2022-11-21 15:09:33,922 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Request joining group due to: > encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response > 2022-11-21 15:09:33,923 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:819] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Giving away all assigned partitions as > lost since
[jira] [Commented] (KAFKA-14419) Same message consumed again by the same stream task after partition is lost and reassigned
[ https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780119#comment-17780119 ] A. Sophie Blee-Goldman commented on KAFKA-14419: [~Carlstedt] first off, do you by any chance have especially long processing latencies? For example iterating over a large range of a state store, making a blocking remote call, etc Since you're using the default max poll interval, I might recommend starting out by lowering the max.poll.records first, as you generally don't want to make the max.poll.interval too small – ideally that would be the last resort. IIRC the default max.poll.records in Streams is set to 1,000 – so maybe try cutting that down to just 100? You also can (should) experiment a bit to find a good balance for your app in the steady state. Though I suppose if you're seeing this frequently enough/able to reproduce it reliably, you could set it to something extremely low as a test, like 10 let's say, just to see if that solves the issue. I'll try to put together a PR for this sometime soon, maybe early next week, so you can also wait for that and trying running with a patched version of Streams. (Would a trunk/3.7 patched version be alright? I'm happy to create a branch with the fix ported to an earlier version of Kafka Streams if you'd prefer, just let me know which version you need) Btw: I'm going to update the ticket title if you don't mind, so that it reflects the bug described in my last response. The current title is just describing the correct behavior of Kafka Streams working as intended > Same message consumed again by the same stream task after partition is lost > and reassigned > -- > > Key: KAFKA-14419 > URL: https://issues.apache.org/jira/browse/KAFKA-14419 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Four Kafka client application instances on separate EC2 instances with a > total of 8 active and 8 standby stream tasks for the same stream topology, > consuming from an input topic with 8 partitions. Sometimes a handful of > messages are consumed twice by one of the stream tasks when stream tasks on > another application instance join the consumer group after an application > instance restart. > Additional information: > Messages are produced to the topic by another Kafka streams topology deployed > on the same four application instances. I have verified that each message is > only produced once by enabling debug logging in the topology flow right > before producing each message to the topic. > Logs from stream thread with duplicate consumption: > > {code:java} > 2022-11-21 15:09:33,677 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is > already rebalancing > 2022-11-21 15:09:33,677 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] (Re-)joining group > Input records consumed for the first time > 2022-11-21 15:09:33,919 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Successfully joined group with > generation Generation{generationId=8017, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:33,920 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began > another rebalance. Need to re-join the group. Sent generation was > Generation{generationId=8017, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', >
[jira] [Comment Edited] (KAFKA-14419) Same message consumed again by the same stream task after partition is lost and reassigned
[ https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779731#comment-17779731 ] A. Sophie Blee-Goldman edited comment on KAFKA-14419 at 10/26/23 10:49 PM: --- Hey, sorry for the long delay – I'm still trying to catch up my memory of this ticket and the related one, but after looking at it again with fresh eyes I think I figured out what's going on here. If I'm reading this situation correctly, it does seem like there is some less-than-ideal behavior that we might be able to improve. Based on your recent logs, I think the root cause here is basically the same as what I fixed in [https://github.com/apache/kafka/pull/12869|https://github.com/apache/kafka/pull/12869,], just to a lesser degree. The issue in that patch was that Streams would sometimes trigger a followup rebalance even while the current rebalance was still going on, which lead some members to drop out of the group upon hitting a REBALANCE_IN_PROGRESS error during the SyncGroup phase. The fix basically just made the StreamThread wait until the rebalance was over before triggering a followup. This should have been sufficient, but I suppose it is still theoretically possible to run into the same issue. Taking a deeper look at the original issue, it would only arise because of how Streams uses a non-blocking poll which allows it to return to its main loop and continue processing in the background during a rebalance. A lot of things happen throughout the loop, but the relevant operations here are as such: # Check the rebalance "schedule" and trigger one if: ## it has been requested for a time equal to or less than the current time ## the consumer is not actively participating in a rebalance (ie sometime after a SyncGroup response is received but before sending a new JoinGroup request) # Poll for more records, during which time either or both of the following may occur: ## consumer enters a new rebalance by sending a JoinGroup request ## consumer participates in a rebalance by receiving the JoinGroup response and sending a SyncGroup request ## consumer completes an ongoing rebalance by receiving a SyncGroup response, after which it can commit offsets for revoked tasks and initialize new ones # Process more records, which might have been either: ## Newly-consumed during the last poll call, or ## Left over from a previous batch that could not be fully processed before needing to return to poll due to running out of time in the max.poll.interval So here's what I'm imagining: let's say we have two consumer, A and B, with A being the group leader/assignor. # A new rebalance begins, and both threads send their JoinGroup requests before returning to process some records # A doesn't have many records left to process, so it quickly returns to the poll call in step 2 of the loop. However B is still processing a large backlog # A performs the assignment and determines that a followup rebalance is needed, so it sets the rebalance schedule to # After the assignment, A sends it out in the SyncGroup request and exits the poll call # A does some processing (or not) before returning to the poll and receiving the SyncGroup response # A exits the poll again, and this time when it reaches step 1 of the loop, it is now able to trigger the new rebalance # After A has requested a new rebalance, it finally returns to the poll call one more time, and rejoins the group/sends a JoinGroup request to kick it off # This whole time, B has had a large backlog of records, or a very high max.poll.interval, or a long GC pause – you get the idea. It's stuck in step 3 # B finally finishes processing and leaves step 3, returning to the poll call during which it sends a very late SyncGroup request. # When the SyncGroup response is eventually received, B gets the REBALANCE_IN_PROGRESS error and fails its rebalance since the generation is stale The fundamental issue here is that B is theoretically able to spend up to the max.poll.interval between sending its SyncGroup request and returning to poll to process the SyncGroup response, but A might be able to process its SyncGroup response, process its records, and then trigger a new rebalance all in that timeframe. This could happen when the task assignment is heavily imbalanced, for example. I can see a few potential paths forward here, and a fourth option that is more of a temporary workaround for [~Carlstedt] if you're still encountering this. None of them are really a guarantee, but they would help. For the most comprehensive fix we might want to consider doing two or even all three of these: Option 1: add a small delay to the Streams followup rebalance trigger to help the entire group finish the SyncGroup phase before beginning the next rebalance. Option 2: set a shorter upper bound on the maximum time a StreamThread
Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]
ocadaruma commented on code in PR #14242: URL: https://github.com/apache/kafka/pull/14242#discussion_r1373898465 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -308,7 +308,14 @@ public void truncateFromEnd(long endOffset) { if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { List removedEntries = removeFromEnd(x -> x.startOffset >= endOffset); -flush(); +// We intentionally don't force flushing change to the device here because: +// - To avoid fsync latency +// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives +// * This method is called by ReplicaFetcher threads, which could block replica fetching +// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. +// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by +// another truncateFromEnd call on log loading procedure so it won't be a problem +flush(false); Review Comment: > those old entries could still exist in the file Yeah, precisely, the content on the device (not file) could be still old. As long as we read the file in usual way (i.e. not through O_DIRECT), we can see the latest data. The staleness on the device arises only when the server experiences power failure before OS flushes the page cache. In this case, indeed the content could be rolled back to old state. But it won't be a problem because leader-epoch file will be truncated again to match to the log file upon loading procedure anyways (this is the case mentioned in (3) in PR description) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15690) EosIntegrationTest is flaky.
[ https://issues.apache.org/jira/browse/KAFKA-15690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-15690: --- Description: EosIntegrationTest shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2, processing threads = false] {code:java} org.junit.runners.model.TestTimedOutException: test timed out after 600 seconds at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleServerDisconnect(NetworkClient.java:) at org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:821) at org.apache.kafka.clients.NetworkClient.processTimeoutDisconnection(NetworkClient.java:779) at org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:837) org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-00, topic=multiPartitionInputTopic, partition=1, offset=15, stacktrace=java.lang.RuntimeException: Detected we've been interrupted. at org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892) at org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867) at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49) at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38) at org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66) {code} shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2, processing threads = false] {code:java} java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. at org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:204) at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:286) at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:274) at org.apache.kafka.streams.integration.EosIntegrationTest.createTopics(EosIntegrationTest.java:174) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-00, topic=multiPartitionInputTopic, partition=1, offset=15, stacktrace=java.lang.RuntimeException: Detected we've been interrupted. at org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892) at org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867) at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49) at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38) at org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66) {code} shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once, processing threads = false] {code:java} org.opentest4j.AssertionFailedError: Condition not met within timeout 6. StreamsTasks did not request commit. ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) java.lang.IllegalStateException: Replica [Topic=__transaction_state,Partition=2,Replica=1] should be in the OfflineReplica,ReplicaDeletionStarted states before moving to ReplicaDeletionIneligible state. Instead it is in OnlineReplica state at kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442) at kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164) at kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164) at scala.collection.immutable.List.foreach(List.scala:333) {code} They are running long and may relate to timeout. was: Finding the following integration tests flaky. EosIntegrationTest { * shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2, processing
Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]
CalvinConfluent commented on PR #14603: URL: https://github.com/apache/kafka/pull/14603#issuecomment-1781972169 https://issues.apache.org/jira/browse/KAFKA-15699 https://issues.apache.org/jira/browse/KAFKA-15700 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15700) FetchFromFollowerIntegrationTest is flaky
Calvin Liu created KAFKA-15700: -- Summary: FetchFromFollowerIntegrationTest is flaky Key: KAFKA-15700 URL: https://issues.apache.org/jira/browse/KAFKA-15700 Project: Kafka Issue Type: Bug Reporter: Calvin Liu It may relate to inappropriate timeout. testRackAwareRangeAssignor(String).quorum=zk {code:java} java.util.concurrent.TimeoutException at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:204) at integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$13(FetchFromFollowerIntegrationTest.scala:229) at integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$13$adapted(FetchFromFollowerIntegrationTest.scala:228) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15699) MirrorConnectorsIntegrationBaseTest is flaky
[ https://issues.apache.org/jira/browse/KAFKA-15699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-15699: --- Description: It may relate to inappropriate test timeout testReplicateSourceDefault() {code:java} org.opentest4j.AssertionFailedError: `delete.retention.ms` should be different, because it's in exclude filter! ==> expected: not equal but was: <8640> at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testReplicateSourceDefault$9(MirrorConnectorsIntegrationBaseTest.java:826){code} testOffsetSyncsTopicsOnTarget() {code:java} java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, deadlineMs=1698275006778, tries=1, nextAllowedTryMs=1698275715972) timed out at 1698275715878 after 1 attempt(s) at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:1276) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:235) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:149) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103){code} was: May relate to inaproparate testReplicateSourceDefault() * testOffsetSyncsTopicsOnTarget() } > MirrorConnectorsIntegrationBaseTest is flaky > > > Key: KAFKA-15699 > URL: https://issues.apache.org/jira/browse/KAFKA-15699 > Project: Kafka > Issue Type: Bug >Reporter: Calvin Liu >Priority: Major > > It may relate to inappropriate test timeout > testReplicateSourceDefault() > {code:java} > org.opentest4j.AssertionFailedError: `delete.retention.ms` should be > different, because it's in exclude filter! ==> expected: not equal but was: > <8640> > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testReplicateSourceDefault$9(MirrorConnectorsIntegrationBaseTest.java:826){code} > testOffsetSyncsTopicsOnTarget() > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, > deadlineMs=1698275006778, tries=1, nextAllowedTryMs=1698275715972) timed out > at 1698275715878 after 1 attempt(s)at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427) >at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:1276) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:235) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:149) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15699) MirrorConnectorsIntegrationBaseTest is flaky
[ https://issues.apache.org/jira/browse/KAFKA-15699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-15699: --- Description: May relate to inaproparate testReplicateSourceDefault() * testOffsetSyncsTopicsOnTarget() } > MirrorConnectorsIntegrationBaseTest is flaky > > > Key: KAFKA-15699 > URL: https://issues.apache.org/jira/browse/KAFKA-15699 > Project: Kafka > Issue Type: Bug >Reporter: Calvin Liu >Priority: Major > > May relate to inaproparate > testReplicateSourceDefault() * testOffsetSyncsTopicsOnTarget() } -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15605: Fix topic deletion handling during ZK migration [kafka]
mumrah merged PR #14545: URL: https://github.com/apache/kafka/pull/14545 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15690) EosIntegrationTest is flaky.
[ https://issues.apache.org/jira/browse/KAFKA-15690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-15690: --- Summary: EosIntegrationTest is flaky. (was: Flaky integration tests) > EosIntegrationTest is flaky. > > > Key: KAFKA-15690 > URL: https://issues.apache.org/jira/browse/KAFKA-15690 > Project: Kafka > Issue Type: Bug >Reporter: Calvin Liu >Priority: Major > > Finding the following integration tests flaky. > EosIntegrationTest { > * > shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2, > processing threads = false] > * shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2, processing > threads = false] > * shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once, processing > threads = false] > } > MirrorConnectorsIntegrationBaseTest { > * testReplicateSourceDefault() > * testOffsetSyncsTopicsOnTarget() > } > FetchFromFollowerIntegrationTest { > * testRackAwareRangeAssignor(String).quorum=zk > } > They are running long and may relate to timeout. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15699) MirrorConnectorsIntegrationBaseTest is flaky
Calvin Liu created KAFKA-15699: -- Summary: MirrorConnectorsIntegrationBaseTest is flaky Key: KAFKA-15699 URL: https://issues.apache.org/jira/browse/KAFKA-15699 Project: Kafka Issue Type: Bug Reporter: Calvin Liu -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15605: Fix topic deletion handling during ZK migration [kafka]
mumrah commented on PR #14545: URL: https://github.com/apache/kafka/pull/14545#issuecomment-1781961533 Test failures look unrelated. https://github.com/apache/kafka/assets/55116/b0eb9cec-76df-4d5e-ab73-dfd2ca4d5978;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15605: Fix topic deletion handling during ZK migration [kafka]
mumrah commented on code in PR #14545: URL: https://github.com/apache/kafka/pull/14545#discussion_r1373863647 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1216,15 +1228,21 @@ class LogManager(logDirs: Seq[File], cleaner.updateCheckpoints(removedLog.parentDirFile, partitionToRemove = Option(topicPartition)) } } -removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), false) +if (isStray) { Review Comment: For now, the desired behavior is to stop tracking the partition but not delete the files. Since migrations are are one-off and inherently risky, I didn't want to take any destructive actions like deleting the logs (immediately or delayed). The stray'd partitions are logged at the INFO level when they are detected, and at WARN on subsequent startups. This gives give operators the information needed to clean up stray partitions if desired. I filed https://issues.apache.org/jira/browse/KAFKA-15698 to track automatic clean up of the stray partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15698) KRaft mode brokers should clean up stray partitions from migration
David Arthur created KAFKA-15698: Summary: KRaft mode brokers should clean up stray partitions from migration Key: KAFKA-15698 URL: https://issues.apache.org/jira/browse/KAFKA-15698 Project: Kafka Issue Type: Improvement Reporter: David Arthur Follow up to KAFKA-15605. After the brokers are migrated to KRaft and the migration is completed, we should let the brokers clean up any partitions that we marked as "stray" during the migration. This would be any partition that was being deleted when the migration began, or any partition that was deleted, but not seen as deleted by StopReplica (e.g., broker down). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15659: Fix shouldInvokeUserDefinedGlobalStateRestoreListener flaky test [kafka]
ableegoldman merged PR #14608: URL: https://github.com/apache/kafka/pull/14608 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15659: Fix shouldInvokeUserDefinedGlobalStateRestoreListener flaky test [kafka]
ableegoldman commented on code in PR #14608: URL: https://github.com/apache/kafka/pull/14608#discussion_r1373854238 ## streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java: ## @@ -523,13 +527,13 @@ public void shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception final Map kafkaStreams1Configuration = mkMap( mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks1"), - mkEntry(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG), appId + "-ks1"), Review Comment: ah, yeah, good catch -- we should definitely not be using the `group.instance.id` config in tests, turning on static membership will delay the test at best (and mess with it at worst) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15659: Fix shouldInvokeUserDefinedGlobalStateRestoreListener flaky test [kafka]
ableegoldman commented on PR #14608: URL: https://github.com/apache/kafka/pull/14608#issuecomment-1781945353 Test failures are unrelated, and the relevant test passed in all cases, so let's merge this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15659: Fix shouldInvokeUserDefinedGlobalStateRestoreListener flaky test [kafka]
ableegoldman commented on code in PR #14608: URL: https://github.com/apache/kafka/pull/14608#discussion_r1373854238 ## streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java: ## @@ -523,13 +527,13 @@ public void shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception final Map kafkaStreams1Configuration = mkMap( mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks1"), - mkEntry(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG), appId + "-ks1"), Review Comment: ah, yeah, good catch -- we should definitely not be using the `group.instance.id` config in tests, it turns on static membership which will delay the test at best (and mess with it at worst) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]
dongnuo123 commented on code in PR #14589: URL: https://github.com/apache/kafka/pull/14589#discussion_r1373846331 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -4385,15 +4451,39 @@ class KafkaApisTest { .setGroupId("group-3") .setErrorCode(Errors.INVALID_GROUP_ID.code) - val expectedOffsetFetchResponse = new OffsetFetchResponseData() -.setGroups(List(group1Response, group2Response, group3Response).asJava) + val group4Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() Review Comment: group3Response and group4Response are the same. I wanted to make sure when ``` group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception) group4Future.complete(group4Response) ``` handleOffsetFetch gives the same response. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]
cmccabe commented on code in PR #14628: URL: https://github.com/apache/kafka/pull/14628#discussion_r1373822529 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -407,36 +414,41 @@ object StorageTool extends Logging { if (directories.isEmpty) { throw new TerseFailure("No log directories found in the configuration.") } - -val unformattedDirectories = directories.filter(directory => { - if (!Files.isDirectory(Paths.get(directory)) || !Files.exists(Paths.get(directory, KafkaServer.brokerMetaPropsFile))) { - true - } else if (!ignoreFormatted) { -throw new TerseFailure(s"Log directory $directory is already formatted. " + - "Use --ignore-formatted to ignore this directory and format the others.") - } else { -false - } -}) -if (unformattedDirectories.isEmpty) { +val loader = new MetaPropertiesEnsemble.Loader() +directories.foreach(loader.addLogDir(_)) +val metaPropertiesEnsemble = loader.load() +metaPropertiesEnsemble.verify(metaProperties.clusterId(), metaProperties.nodeId(), + util.EnumSet.noneOf(classOf[VerificationFlag])) + +val copier = new MetaPropertiesEnsemble.Copier(metaPropertiesEnsemble) +if (!(ignoreFormatted || copier.logDirProps().isEmpty)) { + val firstLogDir = copier.logDirProps().keySet().iterator().next() + throw new TerseFailure(s"Log directory ${firstLogDir} directory is already formatted. " + +"Use --ignore-formatted to ignore this directory and format the others.") +} +if (!copier.errorLogDirs().isEmpty) { + val firstLogDir = copier.errorLogDirs().iterator().next() + throw new TerseFailure(s"I/O error trying to read log directory ${firstLogDir}.") +} +if (copier.emptyLogDirs().isEmpty) { stream.println("All of the log directories are already formatted.") +} else { + copier.emptyLogDirs().forEach(logDir => { +val newMetaProperties = new MetaProperties.Builder(metaProperties). + setDirectoryId(copier.generateValidDirectoryId()). + build() +copier.logDirProps().put(logDir, newMetaProperties) Review Comment: Fair point. I added a setter function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]
dongnuo123 commented on code in PR #14589: URL: https://github.com/apache/kafka/pull/14589#discussion_r1373847099 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -4385,15 +4451,39 @@ class KafkaApisTest { .setGroupId("group-3") .setErrorCode(Errors.INVALID_GROUP_ID.code) - val expectedOffsetFetchResponse = new OffsetFetchResponseData() -.setGroups(List(group1Response, group2Response, group3Response).asJava) + val group4Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() +.setGroupId("group-4") +.setErrorCode(Errors.INVALID_GROUP_ID.code) + + val group5Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() Review Comment: Yeah, you're right. Let me write it to a separate method. We probably don't need this group 5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]
dongnuo123 commented on code in PR #14589: URL: https://github.com/apache/kafka/pull/14589#discussion_r1373846331 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -4385,15 +4451,39 @@ class KafkaApisTest { .setGroupId("group-3") .setErrorCode(Errors.INVALID_GROUP_ID.code) - val expectedOffsetFetchResponse = new OffsetFetchResponseData() -.setGroups(List(group1Response, group2Response, group3Response).asJava) + val group4Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() Review Comment: group3Response and group4Response are the same. I wanted to when ``` group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception) group4Future.complete(group4Response) ``` handleOffsetFetch gives the same response. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15316) CommitRequestManager not calling RequestState callbacks
[ https://issues.apache.org/jira/browse/KAFKA-15316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780100#comment-17780100 ] Philip Nee commented on KAFKA-15316: I think this is fixed. I will check > CommitRequestManager not calling RequestState callbacks > > > Key: KAFKA-15316 > URL: https://issues.apache.org/jira/browse/KAFKA-15316 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848-preview > > CommitRequestManager is not triggering the RequestState callbacks that update > {_}lastReceivedMs{_}, affecting the _canSendRequest_ verification of the > RequestState -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15316) CommitRequestManager not calling RequestState callbacks
[ https://issues.apache.org/jira/browse/KAFKA-15316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reassigned KAFKA-15316: -- Assignee: Philip Nee > CommitRequestManager not calling RequestState callbacks > > > Key: KAFKA-15316 > URL: https://issues.apache.org/jira/browse/KAFKA-15316 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848-preview > > CommitRequestManager is not triggering the RequestState callbacks that update > {_}lastReceivedMs{_}, affecting the _canSendRequest_ verification of the > RequestState -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect
[ https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15556: -- Labels: consumer-threading-refactor kip-848 (was: consumer-threading-refactor kip-848-preview) > Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, > and tryConnect > - > > Key: KAFKA-15556 > URL: https://issues.apache.org/jira/browse/KAFKA-15556 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848 > > The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to > handle networking details in a more centralized way. However, in order to > reuse code between the existing {{KafkaConsumer}} and the new > {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the > {{NetworkClientDelegate}} capitulated and -stole- copied three methods from > {{ConsumerNetworkClient}} related to detecting node status: > # {{isUnavailable}} > # {{maybeThrowAuthFailure}} > # {{tryConnect}} > Unfortunately, these have found their way into the {{FetchRequestManager}} > and {{OffsetsRequestManager}} implementations. We should review if we can > clean up—or even remove—this leaky abstraction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect
[ https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15556: -- Labels: consumer-threading-refactor (was: consumer-threading-refactor kip-848) > Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, > and tryConnect > - > > Key: KAFKA-15556 > URL: https://issues.apache.org/jira/browse/KAFKA-15556 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor > > The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to > handle networking details in a more centralized way. However, in order to > reuse code between the existing {{KafkaConsumer}} and the new > {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the > {{NetworkClientDelegate}} capitulated and -stole- copied three methods from > {{ConsumerNetworkClient}} related to detecting node status: > # {{isUnavailable}} > # {{maybeThrowAuthFailure}} > # {{tryConnect}} > Unfortunately, these have found their way into the {{FetchRequestManager}} > and {{OffsetsRequestManager}} implementations. We should review if we can > clean up—or even remove—this leaky abstraction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15598 & KAFKA-15461: Add integration tests for DescribeGroups API, DeleteGroups API, OffsetDelete API and ListGroups API [kafka]
dongnuo123 commented on code in PR #14537: URL: https://github.com/apache/kafka/pull/14537#discussion_r1373833449 ## core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala: ## @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.junit.jupiter.api.Assertions.fail +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetDeleteWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testOffsetDelete(true) + } + + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetDeleteWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testOffsetDelete(false) + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetDeleteWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { +testOffsetDelete(false) + } + + private def testOffsetDelete(useNewProtocol: Boolean): Unit = { +if (useNewProtocol && !isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") +} + +// Creates the __consumer_offsets topics because it won't be created automatically +// in this test because it does not use FindCoordinator API. +createOffsetsTopic() + +// Create the topic. +createTopic( + topic = "foo", + numPartitions = 3 +) + +for (version <- ApiKeys.OFFSET_DELETE.oldestVersion() to ApiKeys.OFFSET_DELETE.latestVersion(isUnstableApiEnabled)) { + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId, memberEpoch) = joinConsumerGroup( +groupId = "grp", +useNewProtocol = useNewProtocol + ) + + // Commit offsets. + for (partitionId <- 0 to 2) { +commitOffset( + groupId = "grp", + memberId = memberId, + memberEpoch = memberEpoch, + topic = "foo", + partition = partitionId, + offset = 100L + partitionId, + expectedError = Errors.NONE, + version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled) +) + } + + // Delete offset with topic that the group is subscribed to. + deleteOffset( +groupId = "grp", +topic = "foo", +partition = 0, +
Re: [PR] KAFKA-15583: Enforce HWM advance only if partition is not under-min-ISR [kafka]
CalvinConfluent commented on code in PR #14594: URL: https://github.com/apache/kafka/pull/14594#discussion_r1373832091 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -1106,6 +1115,12 @@ class Partition(val topicPartition: TopicPartition, * @return true if the HW was incremented, and false otherwise. */ private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, currentTimeMs: Long = time.milliseconds): Boolean = { +if (metadataCache.isInstanceOf[KRaftMetadataCache] && interBrokerProtocolVersion.isElrSupported && eligibleLeaderReplicasEnabled) { Review Comment: Agree. Now make it a general requirement. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
C0urante commented on code in PR #12728: URL: https://github.com/apache/kafka/pull/12728#discussion_r1373831428 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -528,72 +483,64 @@ public void testRestartConnectorAndTasksNoStatus() throws Exception { ExecutionException ee = assertThrows(ExecutionException.class, () -> restartCallback.get(1000L, TimeUnit.MILLISECONDS)); assertTrue(ee.getCause() instanceof NotFoundException); assertTrue(ee.getMessage().contains("Status for connector")); -PowerMock.verifyAll(); } @Test public void testRestartConnectorAndTasksNoRestarts() throws Exception { RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true); -RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class); -ConnectorStateInfo connectorStateInfo = PowerMock.createMock(ConnectorStateInfo.class); - EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(false).anyTimes(); - EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes(); - EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes(); -EasyMock.expect(herder.buildRestartPlan(restartRequest)) -.andReturn(Optional.of(restartPlan)).anyTimes(); - -connector = PowerMock.createMock(BogusSinkConnector.class); +RestartPlan restartPlan = mock(RestartPlan.class); +ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class); +when(restartPlan.shouldRestartConnector()).thenReturn(false); +when(restartPlan.shouldRestartTasks()).thenReturn(false); + when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo); + doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest); + +connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map connectorConfig = connectorConfig(SourceSink.SINK); -Connector connectorMock = PowerMock.createMock(SinkConnector.class); +Connector connectorMock = mock(SinkConnector.class); expectConfigValidation(connectorMock, true, connectorConfig); -PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); Herder.Created connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result()); FutureCallback restartCallback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, restartCallback); assertEquals(connectorStateInfo, restartCallback.get(1000L, TimeUnit.MILLISECONDS)); -PowerMock.verifyAll(); } @Test public void testRestartConnectorAndTasksOnlyConnector() throws Exception { RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true); -RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class); -ConnectorStateInfo connectorStateInfo = PowerMock.createMock(ConnectorStateInfo.class); - EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(true).anyTimes(); - EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes(); - EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes(); -EasyMock.expect(herder.buildRestartPlan(restartRequest)) -.andReturn(Optional.of(restartPlan)).anyTimes(); +RestartPlan restartPlan = mock(RestartPlan.class); +ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class); +when(restartPlan.shouldRestartConnector()).thenReturn(true); +when(restartPlan.shouldRestartTasks()).thenReturn(false); + when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo); + doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest); herder.onRestart(CONNECTOR_NAME); -EasyMock.expectLastCall(); +verify(statusBackingStore).put(new ConnectorStatus(CONNECTOR_NAME, ConnectorStatus.State.RESTARTING, WORKER_ID, 0)); Review Comment: I think we can just get rid of this part? This isn't really relevant to this test case and it doesn't provide super strong guarantees (especially if we explicitly verify that we're emitting the `RESTARTING` state in the status store like suggested a few lines below). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15598 & KAFKA-15461: Add integration tests for DescribeGroups API, DeleteGroups API, OffsetDelete API and ListGroups API [kafka]
dongnuo123 commented on code in PR #14537: URL: https://github.com/apache/kafka/pull/14537#discussion_r1373831315 ## core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala: ## @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.junit.jupiter.api.Assertions.fail +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetDeleteWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testOffsetDelete(true) + } + + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetDeleteWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testOffsetDelete(false) + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetDeleteWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { +testOffsetDelete(false) + } + + private def testOffsetDelete(useNewProtocol: Boolean): Unit = { +if (useNewProtocol && !isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") +} + +// Creates the __consumer_offsets topics because it won't be created automatically +// in this test because it does not use FindCoordinator API. +createOffsetsTopic() + +// Create the topic. +createTopic( + topic = "foo", + numPartitions = 3 +) + +for (version <- ApiKeys.OFFSET_DELETE.oldestVersion() to ApiKeys.OFFSET_DELETE.latestVersion(isUnstableApiEnabled)) { + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId, memberEpoch) = joinConsumerGroup( +groupId = "grp", +useNewProtocol = useNewProtocol + ) + + // Commit offsets. + for (partitionId <- 0 to 2) { +commitOffset( + groupId = "grp", + memberId = memberId, + memberEpoch = memberEpoch, + topic = "foo", + partition = partitionId, + offset = 100L + partitionId, + expectedError = Errors.NONE, + version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled) +) + } + + // Delete offset with topic that the group is subscribed to. + deleteOffset( +groupId = "grp", +topic = "foo", +partition = 0, +
Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
C0urante commented on code in PR #12728: URL: https://github.com/apache/kafka/pull/12728#discussion_r1373830704 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -528,72 +483,64 @@ public void testRestartConnectorAndTasksNoStatus() throws Exception { ExecutionException ee = assertThrows(ExecutionException.class, () -> restartCallback.get(1000L, TimeUnit.MILLISECONDS)); assertTrue(ee.getCause() instanceof NotFoundException); assertTrue(ee.getMessage().contains("Status for connector")); -PowerMock.verifyAll(); } @Test public void testRestartConnectorAndTasksNoRestarts() throws Exception { RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true); -RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class); -ConnectorStateInfo connectorStateInfo = PowerMock.createMock(ConnectorStateInfo.class); - EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(false).anyTimes(); - EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes(); - EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes(); -EasyMock.expect(herder.buildRestartPlan(restartRequest)) -.andReturn(Optional.of(restartPlan)).anyTimes(); - -connector = PowerMock.createMock(BogusSinkConnector.class); +RestartPlan restartPlan = mock(RestartPlan.class); +ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class); +when(restartPlan.shouldRestartConnector()).thenReturn(false); +when(restartPlan.shouldRestartTasks()).thenReturn(false); + when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo); + doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest); + +connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map connectorConfig = connectorConfig(SourceSink.SINK); -Connector connectorMock = PowerMock.createMock(SinkConnector.class); +Connector connectorMock = mock(SinkConnector.class); expectConfigValidation(connectorMock, true, connectorConfig); -PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); Herder.Created connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result()); FutureCallback restartCallback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, restartCallback); assertEquals(connectorStateInfo, restartCallback.get(1000L, TimeUnit.MILLISECONDS)); -PowerMock.verifyAll(); } @Test public void testRestartConnectorAndTasksOnlyConnector() throws Exception { RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true); -RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class); -ConnectorStateInfo connectorStateInfo = PowerMock.createMock(ConnectorStateInfo.class); - EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(true).anyTimes(); - EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes(); - EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes(); -EasyMock.expect(herder.buildRestartPlan(restartRequest)) -.andReturn(Optional.of(restartPlan)).anyTimes(); +RestartPlan restartPlan = mock(RestartPlan.class); +ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class); +when(restartPlan.shouldRestartConnector()).thenReturn(true); +when(restartPlan.shouldRestartTasks()).thenReturn(false); + when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo); + doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest); herder.onRestart(CONNECTOR_NAME); -EasyMock.expectLastCall(); +verify(statusBackingStore).put(new ConnectorStatus(CONNECTOR_NAME, ConnectorStatus.State.RESTARTING, WORKER_ID, 0)); -connector = PowerMock.createMock(BogusSinkConnector.class); +doNothing().when(herder).onRestart(CONNECTOR_NAME); Review Comment: Instead of mocking this method on the class we're testing, can we let the real method be invoked and add a verification at the end of the test? It could be something like this: ```java ArgumentCaptor connectorStatus = ArgumentCaptor.forClass(ConnectorStatus.class); verify(statusBackingStore).put(connectorStatus.capture()); assertEquals(CONNECTOR_NAME, connectorStatus.getValue().id()); assertEquals(AbstractStatus.State.RESTARTING, connectorStatus.getValue().state()); ``` We can
Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]
cmccabe commented on code in PR #14628: URL: https://github.com/apache/kafka/pull/14628#discussion_r1373824387 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -267,35 +267,27 @@ class LogManager(logDirs: Seq[File], /** * Retrieves the Uuid for the directory, given its absolute path. */ - def directoryId(dir: String): Option[Uuid] = dirIds.get(dir) + def directoryId(dir: String): Option[Uuid] = directoryIds.get(dir) /** * Determine directory ID for each directory with a meta.properties. - * If meta.properties does not include a directory ID, one is generated and persisted back to meta.properties. - * Directories without a meta.properties don't get a directory ID assigned. */ - private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = { -dirs.flatMap { dir => + private def loadDirectoryIds(dirs: Seq[File]): Map[String, Uuid] = { +val result = mutable.HashMap[String, Uuid]() +dirs.foreach(dir => { try { -val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, KafkaServer.brokerMetaPropsFile)) -metadataCheckpoint.read().map { props => - val rawMetaProperties = new RawMetaProperties(props) - val uuid = rawMetaProperties.directoryId match { -case Some(uuidStr) => Uuid.fromString(uuidStr) -case None => - val uuid = Uuid.randomUuid() - rawMetaProperties.directoryId = uuid.toString - metadataCheckpoint.write(rawMetaProperties.props) - uuid - } - dir.getAbsolutePath -> uuid -}.toMap +val props = PropertiesUtils.readPropertiesFile( + new File(dir, MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath) +val metaProps = new MetaProperties.Builder(props).build() +metaProps.directoryId().ifPresent(directoryId => { + result += (dir.getAbsolutePath -> directoryId) +}) Review Comment: Yes, I agree it should have a directory ID by this point, during the course of normal operation. By handling the no-ID case, I was trying to avoid unit tests failing. (To be clear, I haven't tested if they do fail, but I thought they might.) Maybe we could remove this as a follow-on? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]
cmccabe commented on code in PR #14628: URL: https://github.com/apache/kafka/pull/14628#discussion_r1373824387 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -267,35 +267,27 @@ class LogManager(logDirs: Seq[File], /** * Retrieves the Uuid for the directory, given its absolute path. */ - def directoryId(dir: String): Option[Uuid] = dirIds.get(dir) + def directoryId(dir: String): Option[Uuid] = directoryIds.get(dir) /** * Determine directory ID for each directory with a meta.properties. - * If meta.properties does not include a directory ID, one is generated and persisted back to meta.properties. - * Directories without a meta.properties don't get a directory ID assigned. */ - private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = { -dirs.flatMap { dir => + private def loadDirectoryIds(dirs: Seq[File]): Map[String, Uuid] = { +val result = mutable.HashMap[String, Uuid]() +dirs.foreach(dir => { try { -val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, KafkaServer.brokerMetaPropsFile)) -metadataCheckpoint.read().map { props => - val rawMetaProperties = new RawMetaProperties(props) - val uuid = rawMetaProperties.directoryId match { -case Some(uuidStr) => Uuid.fromString(uuidStr) -case None => - val uuid = Uuid.randomUuid() - rawMetaProperties.directoryId = uuid.toString - metadataCheckpoint.write(rawMetaProperties.props) - uuid - } - dir.getAbsolutePath -> uuid -}.toMap +val props = PropertiesUtils.readPropertiesFile( + new File(dir, MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath) +val metaProps = new MetaProperties.Builder(props).build() +metaProps.directoryId().ifPresent(directoryId => { + result += (dir.getAbsolutePath -> directoryId) +}) Review Comment: Yes, I agree it should have a directory ID by this point, during the course of normal operation. By handling the no-ID case, I was trying to avoid unit tests failing. Maybe we could remove this as a follow-on? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]
cmccabe commented on code in PR #14628: URL: https://github.com/apache/kafka/pull/14628#discussion_r1373822529 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -407,36 +414,41 @@ object StorageTool extends Logging { if (directories.isEmpty) { throw new TerseFailure("No log directories found in the configuration.") } - -val unformattedDirectories = directories.filter(directory => { - if (!Files.isDirectory(Paths.get(directory)) || !Files.exists(Paths.get(directory, KafkaServer.brokerMetaPropsFile))) { - true - } else if (!ignoreFormatted) { -throw new TerseFailure(s"Log directory $directory is already formatted. " + - "Use --ignore-formatted to ignore this directory and format the others.") - } else { -false - } -}) -if (unformattedDirectories.isEmpty) { +val loader = new MetaPropertiesEnsemble.Loader() +directories.foreach(loader.addLogDir(_)) +val metaPropertiesEnsemble = loader.load() +metaPropertiesEnsemble.verify(metaProperties.clusterId(), metaProperties.nodeId(), + util.EnumSet.noneOf(classOf[VerificationFlag])) + +val copier = new MetaPropertiesEnsemble.Copier(metaPropertiesEnsemble) +if (!(ignoreFormatted || copier.logDirProps().isEmpty)) { + val firstLogDir = copier.logDirProps().keySet().iterator().next() + throw new TerseFailure(s"Log directory ${firstLogDir} directory is already formatted. " + +"Use --ignore-formatted to ignore this directory and format the others.") +} +if (!copier.errorLogDirs().isEmpty) { + val firstLogDir = copier.errorLogDirs().iterator().next() + throw new TerseFailure(s"I/O error trying to read log directory ${firstLogDir}.") +} +if (copier.emptyLogDirs().isEmpty) { stream.println("All of the log directories are already formatted.") +} else { + copier.emptyLogDirs().forEach(logDir => { +val newMetaProperties = new MetaProperties.Builder(metaProperties). + setDirectoryId(copier.generateValidDirectoryId()). + build() +copier.logDirProps().put(logDir, newMetaProperties) Review Comment: It seems a bit cumbersome to put accessors on all the maps. But I'm open to ideas. I do wish Java had some way of flagging this map as different from the immutable ones. Kind of like `const` in C++, or yes, the whole menagerie of immtuable/mutable Scala classes. Although that comes with its own set of problems. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15691) Upgrade existing and add new system tests to use new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15691: -- Summary: Upgrade existing and add new system tests to use new consumer (was: Upgrade existing and add new system tests to use new coordinator) > Upgrade existing and add new system tests to use new consumer > - > > Key: KAFKA-15691 > URL: https://issues.apache.org/jira/browse/KAFKA-15691 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Priority: Minor > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]
cmccabe commented on code in PR #14628: URL: https://github.com/apache/kafka/pull/14628#discussion_r1373820141 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -228,23 +226,26 @@ class KafkaServer( info(s"Cluster ID = $clusterId") /* load metadata */ -val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = - BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs(config.logDirs, ignoreMissing = true, kraftMode = false) - -if (preloadedBrokerMetadataCheckpoint.version != 0) { - throw new RuntimeException(s"Found unexpected version in loaded `meta.properties`: " + -s"$preloadedBrokerMetadataCheckpoint. Zk-based brokers only support version 0 " + -"(which is implicit when the `version` field is missing).") +val initialMetaPropsEnsemble = { + val loader = new MetaPropertiesEnsemble.Loader() + config.logDirs.foreach(loader.addLogDir(_)) + loader.load() } Review Comment: yes. good catch. @pprovenzano also found this bug through testing :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]
cmccabe commented on code in PR #14628: URL: https://github.com/apache/kafka/pull/14628#discussion_r1373816778 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogDirFailureChannel.java: ## @@ -57,7 +56,7 @@ public boolean hasOfflineLogDir(String logDir) { * @param msg Error message. * @param e Exception instance. */ -public void maybeAddOfflineLogDir(String logDir, String msg, IOException e) { +public void maybeAddOfflineLogDir(String logDir, String msg, Exception e) { Review Comment: Fair enough. I changed it above to just catch IOE so I will change it here as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]
cmccabe commented on code in PR #14628: URL: https://github.com/apache/kafka/pull/14628#discussion_r1373817350 ## core/src/main/scala/kafka/server/KafkaRaftServer.scala: ## @@ -135,39 +136,52 @@ object KafkaRaftServer { * @return A tuple containing the loaded meta properties (which are guaranteed to * be consistent across all log dirs) and the offline directories */ - def initializeLogDirs(config: KafkaConfig): (MetaProperties, BootstrapMetadata, Seq[String]) = { -val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq -val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint. - getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false, kraftMode = true) - -if (offlineDirs.contains(config.metadataLogDir)) { - throw new KafkaException("Cannot start server since `meta.properties` could not be " + -s"loaded from ${config.metadataLogDir}") + def initializeLogDirs(config: KafkaConfig): (MetaPropertiesEnsemble, BootstrapMetadata) = { +// Load and verify the original ensemble. +val loader = new MetaPropertiesEnsemble.Loader() +loader.addMetadataLogDir(config.metadataLogDir) +config.logDirs.foreach(loader.addLogDir(_)) +val initialMetaPropsEnsemble = loader.load() +initialMetaPropsEnsemble.emptyLogDirs().forEach(logDir => { + throw new RuntimeException(s"No `meta.properties` found in $logDir (have you run `kafka-storage.sh` " + +"to format the directory?)") +}) +val verificationFlags = if (config.migrationEnabled) { + util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR) +} else { + util.EnumSet.of(REQUIRE_V0, REQUIRE_AT_LEAST_ONE_VALID) } +initialMetaPropsEnsemble.verify(Optional.empty(), OptionalInt.of(config.nodeId), verificationFlags); +// Check that the __cluster_metadata-0 topic does not appear outside the metadata directory. val metadataPartitionDirName = UnifiedLog.logDirName(MetadataPartition) -val onlineNonMetadataDirs = logDirs.diff(offlineDirs :+ config.metadataLogDir) -onlineNonMetadataDirs.foreach { logDir => - val metadataDir = new File(logDir, metadataPartitionDirName) - if (metadataDir.exists) { -throw new KafkaException(s"Found unexpected metadata location in data directory `$metadataDir` " + - s"(the configured metadata directory is ${config.metadataLogDir}).") +initialMetaPropsEnsemble.logDirProps().keySet().forEach(logDir => { + if (!logDir.equals(config.metadataLogDir)) { +val clusterMetadataTopic = new File(logDir, metadataPartitionDirName) +if (clusterMetadataTopic.exists) { + throw new KafkaException(s"Found unexpected metadata location in data directory `$clusterMetadataTopic` " + +s"(the configured metadata directory is ${config.metadataLogDir}).") +} } -} - -val metaProperties = MetaProperties.parse(rawMetaProperties) -if (config.nodeId != metaProperties.nodeId) { - throw new InconsistentNodeIdException( -s"Configured node.id `${config.nodeId}` doesn't match stored node.id `${metaProperties.nodeId}' in " + - "meta.properties. If you moved your data, make sure your configured controller.id matches. " + - "If you intend to create a new broker, you should remove all data in your data directories (log.dirs).") -} +}) + +// Set directory IDs on all directories. Rewrite the files if needed. Review Comment: Yeah. Maybe eventually we'll also auto-upgrade from v0 -> v1 here (once not in migration mode any more) v0 is quite annoying since there's basically no required fields at all But one step at a time... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]
cmccabe commented on code in PR #14628: URL: https://github.com/apache/kafka/pull/14628#discussion_r1373817350 ## core/src/main/scala/kafka/server/KafkaRaftServer.scala: ## @@ -135,39 +136,52 @@ object KafkaRaftServer { * @return A tuple containing the loaded meta properties (which are guaranteed to * be consistent across all log dirs) and the offline directories */ - def initializeLogDirs(config: KafkaConfig): (MetaProperties, BootstrapMetadata, Seq[String]) = { -val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq -val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint. - getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false, kraftMode = true) - -if (offlineDirs.contains(config.metadataLogDir)) { - throw new KafkaException("Cannot start server since `meta.properties` could not be " + -s"loaded from ${config.metadataLogDir}") + def initializeLogDirs(config: KafkaConfig): (MetaPropertiesEnsemble, BootstrapMetadata) = { +// Load and verify the original ensemble. +val loader = new MetaPropertiesEnsemble.Loader() +loader.addMetadataLogDir(config.metadataLogDir) +config.logDirs.foreach(loader.addLogDir(_)) +val initialMetaPropsEnsemble = loader.load() +initialMetaPropsEnsemble.emptyLogDirs().forEach(logDir => { + throw new RuntimeException(s"No `meta.properties` found in $logDir (have you run `kafka-storage.sh` " + +"to format the directory?)") +}) +val verificationFlags = if (config.migrationEnabled) { + util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR) +} else { + util.EnumSet.of(REQUIRE_V0, REQUIRE_AT_LEAST_ONE_VALID) } +initialMetaPropsEnsemble.verify(Optional.empty(), OptionalInt.of(config.nodeId), verificationFlags); +// Check that the __cluster_metadata-0 topic does not appear outside the metadata directory. val metadataPartitionDirName = UnifiedLog.logDirName(MetadataPartition) -val onlineNonMetadataDirs = logDirs.diff(offlineDirs :+ config.metadataLogDir) -onlineNonMetadataDirs.foreach { logDir => - val metadataDir = new File(logDir, metadataPartitionDirName) - if (metadataDir.exists) { -throw new KafkaException(s"Found unexpected metadata location in data directory `$metadataDir` " + - s"(the configured metadata directory is ${config.metadataLogDir}).") +initialMetaPropsEnsemble.logDirProps().keySet().forEach(logDir => { + if (!logDir.equals(config.metadataLogDir)) { +val clusterMetadataTopic = new File(logDir, metadataPartitionDirName) +if (clusterMetadataTopic.exists) { + throw new KafkaException(s"Found unexpected metadata location in data directory `$clusterMetadataTopic` " + +s"(the configured metadata directory is ${config.metadataLogDir}).") +} } -} - -val metaProperties = MetaProperties.parse(rawMetaProperties) -if (config.nodeId != metaProperties.nodeId) { - throw new InconsistentNodeIdException( -s"Configured node.id `${config.nodeId}` doesn't match stored node.id `${metaProperties.nodeId}' in " + - "meta.properties. If you moved your data, make sure your configured controller.id matches. " + - "If you intend to create a new broker, you should remove all data in your data directories (log.dirs).") -} +}) + +// Set directory IDs on all directories. Rewrite the files if needed. Review Comment: Yeah. Maybe eventually we'll upgrade from v0 -> v1 (if not in migration mode any more) v0 is quite annoying since there's basically no required fields at all But one step at a time... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15661: KIP-951: protocol changes [kafka]
chb2ab commented on code in PR #14627: URL: https://github.com/apache/kafka/pull/14627#discussion_r1373815620 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -360,7 +360,9 @@ public short partitionRecordVersion() { } public short fetchRequestVersion() { -if (this.isAtLeast(IBP_3_5_IV1)) { +if (this.isAtLeast(IBP_3_7_IV0)) { Review Comment: Oh, yeah we shouldn't need any other changes, the fields are all tagged and not getting used anywhere so we can leave them as their default values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]
cmccabe commented on code in PR #14628: URL: https://github.com/apache/kafka/pull/14628#discussion_r1373815484 ## metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesVersion.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.properties; + +/** + * The version of a meta.properties file. + */ +public enum MetaPropertiesVersion { +V0(0), +V1(1); + +private final int number; + +public static MetaPropertiesVersion fromNumberString(String numberString) { +int number; +try { +number = Integer.parseInt(numberString.trim()); +} catch (NumberFormatException e) { +throw new RuntimeException("Invalid meta.properties version string '" + +numberString + "'"); +} +return fromNumber(number); +} + +public static MetaPropertiesVersion fromNumber(int number) { +switch (number) { +case 0: return V0; +case 1: return V1; +default: throw new RuntimeException("Unknown meta.properties version number " + number); +} +} + +MetaPropertiesVersion(int number) { +this.number = number; +} + +public int number() { +return number; +} + +public String numberString() { +return "" + number; +} + +public boolean hasBrokerId() { +return this == V0; +} + +public boolean alwaysHasId() { Review Comment: yes, let's rename it to `alwaysHasNodeId` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15661: KIP-951: protocol changes [kafka]
chb2ab commented on code in PR #14627: URL: https://github.com/apache/kafka/pull/14627#discussion_r1373814111 ## clients/src/main/resources/common/message/FetchRequest.json: ## @@ -53,7 +53,9 @@ // // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, // deprecate the old ReplicaId field and set its default value to -1. (KIP-903) - "validVersions": "0-15", + // + // Version 16 is the same as version 15. + "validVersions": "0-16", Review Comment: ok, added the label -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]
cmccabe commented on code in PR #14628: URL: https://github.com/apache/kafka/pull/14628#discussion_r1373813203 ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -227,8 +231,12 @@ public KafkaClusterTestKit build() throws Exception { setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList()); BootstrapMetadata bootstrapMetadata = BootstrapMetadata. fromVersion(nodes.bootstrapMetadataVersion(), "testkit"); +MetaPropertiesEnsemble metaPropertiesEnsemble = new MetaPropertiesEnsemble.Loader(). +setLoadMissingBehavior(LoadMissingBehavior.EXCEPTION). +addMetadataLogDir(node.metadataDirectory()). Review Comment: Yes, this was left over from some earlier code, sorry. Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]
cmccabe commented on code in PR #14628: URL: https://github.com/apache/kafka/pull/14628#discussion_r1373812595 ## metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsemble.java: ## @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.properties; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.function.BiConsumer; + +/** + * A collection of meta.properties information for Kafka log directories. + * + * Directories are categorized into empty, error, and normal. Each directory must appear in only + * one category, corresponding to emptyLogDirs, errorLogDirs, and logDirProps. + * + * This class is immutable. Modified copies can be made with the Copier class. + */ +public class MetaPropertiesEnsemble { +/** + * The log4j object for this class. + */ +private static final Logger LOG = LoggerFactory.getLogger(MetaPropertiesEnsemble.class); + +/** + * A completely empty MetaPropertiesEnsemble object. + */ +public static final MetaPropertiesEnsemble EMPTY = new MetaPropertiesEnsemble(Collections.emptySet(), +Collections.emptySet(), +Collections.emptyMap(), +Optional.empty()); + +/** + * The name of the meta.properties file within each log directory. + */ +public static final String META_PROPERTIES_NAME = "meta.properties"; + +/** + * The set of log dirs that were empty. + */ +private final Set emptyLogDirs; + +/** + * The set of log dirs that had errors. + */ +private final Set errorLogDirs; + +/** + * A map from log directories to the meta.properties information inside each one. + */ +private final Map logDirProps; + +/** + * The metadata log directory, or the empty string if there is none. + */ +private final Optional metadataLogDir; + +/** + * Utility class for loading a MetaPropertiesEnsemble from the disk. + */ +public static class Loader { +private final TreeSet logDirs = new TreeSet<>(); +private Optional metadataLogDir = Optional.empty(); + +public Loader addLogDirs(Collection logDirs) { +for (String logDir : logDirs) { +this.logDirs.add(logDir); +} +return this; +} + +public Loader addLogDir(String logDir) { +this.logDirs.add(logDir); +return this; +} + +public Loader addMetadataLogDir(String metadataLogDir) { +if (this.metadataLogDir.isPresent()) { +throw new RuntimeException("Cannot specify more than one metadata log directory. " + +"Already specified " + this.metadataLogDir.get()); +} +this.metadataLogDir = Optional.of(metadataLogDir); +logDirs.add(metadataLogDir); +return this; +} + +public MetaPropertiesEnsemble load() throws IOException { +if (logDirs.isEmpty()) { +throw new RuntimeException("You must specify at least one log directory."); +} +Set emptyLogDirs = new HashSet<>(); +Set errorLogDirs = new HashSet<>(); +Map logDirProps = new HashMap<>(); +for (String logDir : logDirs) { +String metaPropsFile = new File(logDir, META_PROPERTIES_NAME).getAbsolutePath(); +try { +Properties props = PropertiesUtils.readPropertiesFile(metaPropsFile); +
Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]
cmccabe commented on code in PR #14628: URL: https://github.com/apache/kafka/pull/14628#discussion_r1373810872 ## metadata/src/main/java/org/apache/kafka/metadata/properties/PropertiesUtils.java: ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.properties; + +import org.apache.kafka.common.utils.Utils; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Properties; + +public final class PropertiesUtils { +/** + * Writes a Java Properties object to a file. + * + * @param props The Properties object. + * @param path The file to write to. + * @throws IOException + */ +public static void writePropertiesFile( +Properties props, +String path +) throws IOException { +File tempFile = new File(path + ".tmp"); +try ( +FileOutputStream fos = new FileOutputStream(tempFile, false); +OutputStreamWriter osw = new OutputStreamWriter(fos, StandardCharsets.UTF_8); +PrintWriter pw = new PrintWriter(osw) +) { +props.store(pw, ""); +fos.flush(); +fos.getFD().sync(); Review Comment: In some filesystems, if you don't fsync the parent directory, the file can be lost if the machine loses power. This is again something ext3 was famous for. Letting you sync files to disk, but then making them unreachable because you didn't sync the directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]
cmccabe commented on code in PR #14628: URL: https://github.com/apache/kafka/pull/14628#discussion_r1373808533 ## metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java: ## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.properties; + +import org.apache.kafka.common.Uuid; + +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Properties; + +/** + * An immutable class which contains the per-log-directory information stored in an individual + * meta.properties file. + */ +public final class MetaProperties { Review Comment: Hmm, good question. I guess I think it does belong here. The `:metadata` module in gradle is about more than just the controller. it's about how kafka handles metadata in general and has things like node registrations, metadata publishing and loading, etc. I think this is in keeping with that. Also, independently of all that, we should all be striving to shrink the `:core` module, not add more stuff there. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1373793336 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Yes. I'm having trouble speaking today. Map in scala is immutable. 臘♀️ Apologies for confusion. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Yes. I'm having trouble speaking today. Map in scala is immutable. 臘♀️ Apologies for confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15598 & KAFKA-15461: Add integration tests for DescribeGroups API, DeleteGroups API, OffsetDelete API and ListGroups API [kafka]
dongnuo123 commented on code in PR #14537: URL: https://github.com/apache/kafka/pull/14537#discussion_r1373787330 ## core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala: ## @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.ListGroupsResponseData +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testListGroupsWithNewProtocol() + } + + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testListGroupsWithOldProtocol() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { +testListGroupsWithOldProtocol() + } + + private def testListGroupsWithNewProtocol(): Unit = { +if (!isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") +} + +// Creates the __consumer_offsets topics because it won't be created automatically +// in this test because it does not use FindCoordinator API. +createOffsetsTopic() + +// Create the topic. +createTopic( + topic = "foo", + numPartitions = 3 +) + +for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) { + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId1, _) = joinConsumerGroup("grp", true) + + checkListedGroups( +groupId = "grp", +state = ConsumerGroupState.STABLE.toString, +statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.ASSIGNING.toString), +version = version + ) + + // Member 2 joins the group, triggering a rebalance. + val (memberId2, _) = joinConsumerGroup("grp", true) + + checkListedGroups( +groupId = "grp", +state = ConsumerGroupState.RECONCILING.toString, +
Re: [PR] KAFKA-15583: Enforce HWM advance only if partition is not under-min-ISR With ELR [kafka]
artemlivshits commented on code in PR #14594: URL: https://github.com/apache/kafka/pull/14594#discussion_r1373758559 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -1106,6 +1115,12 @@ class Partition(val topicPartition: TopicPartition, * @return true if the HW was incremented, and false otherwise. */ private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, currentTimeMs: Long = time.milliseconds): Boolean = { +if (metadataCache.isInstanceOf[KRaftMetadataCache] && interBrokerProtocolVersion.isElrSupported && eligibleLeaderReplicasEnabled) { Review Comment: I have a higher-level question -- do we actually care if ELR is enabled for this logic? It is my understanding that the logic on the brokers doesn't use ELR, it just has some changes that seem to improve the protocol even if ELR is not used / implemented: - strict min ISR - advance HWM only only if min ISR known to the controller -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]
wcarlson5 merged PR #14575: URL: https://github.com/apache/kafka/pull/14575 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]
wcarlson5 commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1373750580 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryResponse; + +import java.util.Optional; + +/** + * The interface used by the `NetworkClient` to send telemetry requests. + */ +public interface ClientTelemetrySender extends AutoCloseable { + +/** + * Return the next time when the telemetry API should be attempted (i.e., interval time has elapsed). + * + * If another telemetry API is in-flight, then {@code timeoutMs} should be returned as the + * maximum wait time. + * + * @param timeoutMs The timeout for the inflight telemetry API call. + * @return remaining time in ms till the telemetry API be attempted again. + */ +long timeToNextUpdate(long timeoutMs); + +/** + * Return the telemetry request based on client state i.e. determine if + * {@link org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest} or + * {@link org.apache.kafka.common.requests.PushTelemetryRequest} be constructed. + * + * @return request for telemetry API call. + */ +Optional> createRequest(); + +/** + * Handle successful response for get telemetry subscriptions request. + * + * @param response subscriptions telemetry API response + */ +void handleResponse(GetTelemetrySubscriptionsResponse response); + +/** + * Handle successful response for push telemetry request. + * + * @param response push telemetry API response + */ +void handleResponse(PushTelemetryResponse response); + +/** + * Handle response for failed get telemetry subscriptions request. + * + * @param kafkaException the fatal exception. + */ +void handleFailedGetTelemetrySubscriptionsRequest(KafkaException kafkaException); Review Comment: I think this is fins as it is. The Kafka Exception should be broad enough ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * Value object that contains the name and tags for a Metric. + */ +public class MetricKey implements MetricKeyable { + +private final String name; +private final Map tags; + +/** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the .converted. name of the metric (the final name + * under which this metric is emitted). + */ +public MetricKey(String name) { +this(name, null); +} + +/** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the .converted. name of the metric (the final name + * under which this metric is emitted). + * @param tags mapping of tag
Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]
alok123t commented on PR #14607: URL: https://github.com/apache/kafka/pull/14607#issuecomment-1781801313 @junrao I think the test failures should be unrelated to the PR from a quick look - not sure if these are known flaky tests, I will wait for another run from the latest commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]
alok123t commented on code in PR #14607: URL: https://github.com/apache/kafka/pull/14607#discussion_r1373738648 ## storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadata.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; + +public class PartitionMetadata { + +private final int version; +private final Uuid topicId; + +public PartitionMetadata(int version, Uuid topicId) { +this.version = version; +this.topicId = topicId; +} + +public int version() { +return version; +} + +public Uuid topicId() { +return topicId; +} + +public String toText() { Review Comment: updated to encode in https://github.com/apache/kafka/pull/14607/commits/f2d05e64314401142d1386f031b45b42425d5a93 It's better to use the `Formatter` interface in `LeaderEpochCheckpointFile`, we can do in a follow up PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]
apoorvmittal10 commented on PR #14621: URL: https://github.com/apache/kafka/pull/14621#issuecomment-1781794070 > @apoorvmittal10 : Yes, we are gradually moving the codebase to java. Ideally, all new classes should be written in java. Thanks @junrao, I ll update the PR by tomorrow with new classes in Java. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15574) Update states and transitions for membership manager state machine
[ https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15574: -- Description: This task is to update the state machine so that it correctly acts as the glue between the heartbeat request manager and the assignment reconciler. The state machine will transition from one state to another as a response to heartbeats, callback completion, errors, unsubscribing, and other external events. A given transition may kick off one or more actions that are implemented outside of the state machine. Steps: # Update the set of states in the code as [defined in the diagrams on the wiki|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+rebalance#Consumerrebalance-RebalanceStateMachine] # Ensure the correct state transitions are performed as responses to external input # _Define_ any actions that should be taken as a result of the above transitions, but defer the _implementation_ to separate Jiras/PRs as much as possible was: This task includes: # Updating the states in the code as [defined in the diagrams on the wiki|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+rebalance#Consumerrebalance-RebalanceStateMachine] # Implement the state transitions as responses to heartbeat responses, callbacks, unsubscribe, etc. # _Define_ the actions that should be taken as a result of the transitions; defer the _implementation_ of as many actions as reasonable possible to separate Jiras/PRs The state machine will work to tie the heartbeat request manager and the partition assignment reconciler together. is to call the partition assignment reconciler from the heartbeat request manager, making sure to correctly query the state machine for the right actions. The HB-reconciler interaction is 2 folded: * HB should send HB req when the reconciler completes callbacks * HB manager needs to trigger the reconciler to release assignments when errors occur. All driven by the HB manager. > Update states and transitions for membership manager state machine > -- > > Key: KAFKA-15574 > URL: https://issues.apache.org/jira/browse/KAFKA-15574 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > This task is to update the state machine so that it correctly acts as the > glue between the heartbeat request manager and the assignment reconciler. > The state machine will transition from one state to another as a response to > heartbeats, callback completion, errors, unsubscribing, and other external > events. A given transition may kick off one or more actions that are > implemented outside of the state machine. > Steps: > # Update the set of states in the code as [defined in the diagrams on the > wiki|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+rebalance#Consumerrebalance-RebalanceStateMachine] > # Ensure the correct state transitions are performed as responses to > external input > # _Define_ any actions that should be taken as a result of the above > transitions, but defer the _implementation_ to separate Jiras/PRs as much as > possible -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15574) Update states and transitions for membership manager state machine
[ https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15574: -- Summary: Update states and transitions for membership manager state machine (was: Implement heartbeat membership manager state machine) > Update states and transitions for membership manager state machine > -- > > Key: KAFKA-15574 > URL: https://issues.apache.org/jira/browse/KAFKA-15574 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > This task includes: > # Updating the states in the code as [defined in the diagrams on the > wiki|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+rebalance#Consumerrebalance-RebalanceStateMachine] > # Implement the state transitions as responses to heartbeat responses, > callbacks, unsubscribe, etc. > # _Define_ the actions that should be taken as a result of the transitions; > defer the _implementation_ of as many actions as reasonable possible to > separate Jiras/PRs > The state machine will work to tie the heartbeat request manager and the > partition assignment reconciler together. is to call the partition > assignment reconciler from the heartbeat request manager, making sure to > correctly query the state machine for the right actions. > > The HB-reconciler interaction is 2 folded: > * HB should send HB req when the reconciler completes callbacks > * HB manager needs to trigger the reconciler to release assignments when > errors occur. > All driven by the HB manager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15574) Implement heartbeat membership manager state machine
[ https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15574: -- Description: This task includes: # Updating the states in the code as [defined in the diagrams on the wiki|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+rebalance#Consumerrebalance-RebalanceStateMachine] # Implement the state transitions as responses to heartbeat responses, callbacks, unsubscribe, etc. # _Define_ the actions that should be taken as a result of the transitions; defer the _implementation_ of as many actions as reasonable possible to separate Jiras/PRs The state machine will work to tie the heartbeat request manager and the partition assignment reconciler together. is to call the partition assignment reconciler from the heartbeat request manager, making sure to correctly query the state machine for the right actions. The HB-reconciler interaction is 2 folded: * HB should send HB req when the reconciler completes callbacks * HB manager needs to trigger the reconciler to release assignments when errors occur. All driven by the HB manager. was: This task is to call the partition assignment reconciler from the heartbeat request manager, making sure to correctly query the state machine for the right actions. The HB-reconciler interaction is 2 folded: * HB should send HB req when the reconciler completes callbacks * HB manager needs to trigger the reconciler to release assignments when errors occur. All driven by the HB manager. > Implement heartbeat membership manager state machine > > > Key: KAFKA-15574 > URL: https://issues.apache.org/jira/browse/KAFKA-15574 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > This task includes: > # Updating the states in the code as [defined in the diagrams on the > wiki|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+rebalance#Consumerrebalance-RebalanceStateMachine] > # Implement the state transitions as responses to heartbeat responses, > callbacks, unsubscribe, etc. > # _Define_ the actions that should be taken as a result of the transitions; > defer the _implementation_ of as many actions as reasonable possible to > separate Jiras/PRs > The state machine will work to tie the heartbeat request manager and the > partition assignment reconciler together. is to call the partition > assignment reconciler from the heartbeat request manager, making sure to > correctly query the state machine for the right actions. > > The HB-reconciler interaction is 2 folded: > * HB should send HB req when the reconciler completes callbacks > * HB manager needs to trigger the reconciler to release assignments when > errors occur. > All driven by the HB manager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15574) Implement heartbeat membership manager state machine
[ https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15574: -- Summary: Implement heartbeat membership manager state machine (was: Integrate partition assignment reconciliation with heartbeat request manager) > Implement heartbeat membership manager state machine > > > Key: KAFKA-15574 > URL: https://issues.apache.org/jira/browse/KAFKA-15574 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > This task is to call the partition assignment reconciler from the heartbeat > request manager, making sure to correctly query the state machine for the > right actions. > > The HB-reconciler interaction is 2 folded: > * HB should send HB req when the reconciler completes callbacks > * HB manager needs to trigger the reconciler to release assignments when > errors occur. > All driven by the HB manager. -- This message was sent by Atlassian Jira (v8.20.10#820010)