[jira] [Assigned] (KAFKA-16232) kafka hangs forever in the starting process if the authorizer future is not returned
[ https://issues.apache.org/jira/browse/KAFKA-16232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-16232: - Assignee: Kuan-Po Tseng > kafka hangs forever in the starting process if the authorizer future is not > returned > > > Key: KAFKA-16232 > URL: https://issues.apache.org/jira/browse/KAFKA-16232 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.1 >Reporter: Luke Chen >Assignee: Kuan-Po Tseng >Priority: Major > > For security reason, during broker startup, we will wait until all ACL > entries loaded before starting serving requests. But recently, we > accidentally set standardAuthorizer to ZK broker, and then, the broker never > enters RUNNING state because it's waiting for the standardAuthorizer future > completion. Of course this is a human error to set the wrong configuration, > but it'd be better we could handle this case better. Suggestions: > 1. set timeout for authorizer future waiting (how long is long enough?) > 2. add logs before and after future waiting, to allow admin to know we're > waiting for the authorizer future. > We can start with (2), and thinking about (1) later. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] kafka-connect configuration file uses UTF-8 character set. [kafka]
pedoc opened a new pull request, #15456: URL: https://github.com/apache/kafka/pull/15456 Currently, when using ``kafka-connect`` configuration files (such as ``connect-standalone.properties``) in non-English regions, if they contain non-ASCII characters, the program will not recognize them. ###Motivation Specifically, when we use ``debezium`` (https://debezium.io/), we need to specify the excluded database table through configuration. At this time, our database table name is Chinese, because the default read configuration does not use ``UTF-8`` Encoding, which causes the configuration to become garbled after being read and cannot be correctly excluded. ``debezium`` uses the ``kafka-clients`` package directly, so this cannot be solved in ``debezium``. ###Note ``UTF-8`` encoding doesn't break ``ASCII`` reading, so should this be backported to an earlier version? ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16232) kafka hangs forever in the starting process if the authorizer future is not returned
[ https://issues.apache.org/jira/browse/KAFKA-16232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822739#comment-17822739 ] Kuan-Po Tseng commented on KAFKA-16232: --- Thank you [~showuon] , but I can't assign this issue to myself, could you help me ? Thanks again ! > kafka hangs forever in the starting process if the authorizer future is not > returned > > > Key: KAFKA-16232 > URL: https://issues.apache.org/jira/browse/KAFKA-16232 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.1 >Reporter: Luke Chen >Priority: Major > > For security reason, during broker startup, we will wait until all ACL > entries loaded before starting serving requests. But recently, we > accidentally set standardAuthorizer to ZK broker, and then, the broker never > enters RUNNING state because it's waiting for the standardAuthorizer future > completion. Of course this is a human error to set the wrong configuration, > but it'd be better we could handle this case better. Suggestions: > 1. set timeout for authorizer future waiting (how long is long enough?) > 2. add logs before and after future waiting, to allow admin to know we're > waiting for the authorizer future. > We can start with (2), and thinking about (1) later. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16232) kafka hangs forever in the starting process if the authorizer future is not returned
[ https://issues.apache.org/jira/browse/KAFKA-16232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822736#comment-17822736 ] Luke Chen commented on KAFKA-16232: --- Yes, please. Thanks for picking it up [~brandboat]! > kafka hangs forever in the starting process if the authorizer future is not > returned > > > Key: KAFKA-16232 > URL: https://issues.apache.org/jira/browse/KAFKA-16232 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.1 >Reporter: Luke Chen >Priority: Major > > For security reason, during broker startup, we will wait until all ACL > entries loaded before starting serving requests. But recently, we > accidentally set standardAuthorizer to ZK broker, and then, the broker never > enters RUNNING state because it's waiting for the standardAuthorizer future > completion. Of course this is a human error to set the wrong configuration, > but it'd be better we could handle this case better. Suggestions: > 1. set timeout for authorizer future waiting (how long is long enough?) > 2. add logs before and after future waiting, to allow admin to know we're > waiting for the authorizer future. > We can start with (2), and thinking about (1) later. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16321) Default directory ids to MIGRATING, not UNASSIGNED
Colin McCabe created KAFKA-16321: Summary: Default directory ids to MIGRATING, not UNASSIGNED Key: KAFKA-16321 URL: https://issues.apache.org/jira/browse/KAFKA-16321 Project: Kafka Issue Type: Bug Reporter: Colin McCabe Assignee: Colin McCabe Directory ids should be defaulted to MIGRATING, not UNASSIGNED. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
kirktrue commented on PR #15426: URL: https://github.com/apache/kafka/pull/15426#issuecomment-1974135452 > @kirktrue are you fine with merging this PR and coming back to this after 3.8? Yes. I think this is an area that we need a more holistic design review, unfortunately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]
mjsax commented on code in PR #15414: URL: https://github.com/apache/kafka/pull/15414#discussion_r1509703745 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java: ## @@ -203,13 +202,27 @@ public void registerStore(final StateStore store, ); try { -restoreState( -stateRestoreCallback, -topicPartitions, -highWatermarks, -store.name(), -converterForStore(store) -); +if (topology.storeNameToReprocessOnRestore().getOrDefault(store.name(), false)) { +globalConsumer.assign(topicPartitions); +globalConsumer.seekToBeginning(topicPartitions); +for (final TopicPartition topicPartition : topicPartitions) { +stateRestoreListener.onRestoreStart(topicPartition, store.name(), +checkpointFileCache.getOrDefault(topicPartition, 0L), +checkpointFileCache.getOrDefault(topicPartition, 0L)); +stateRestoreListener.onRestoreEnd(topicPartition, store.name(), 0L); Review Comment: Where does the actual restore happen? Note that the original `restoreState()` is the "bootstrapping phase" before we move to `RUNNING`, and we should preserve this behavior. It seem, your PR basically skips the bootstrapping, and relies on the regular processing to re-read the data? For this case, we would go to `RUNNING` with an empty global store and thus lookups might fail as the data is not loaded yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15303: Avoid unnecessary re-serialization in FK-join [kafka]
mjsax commented on PR #14157: URL: https://github.com/apache/kafka/pull/14157#issuecomment-1974097523 Thanks for the ping -- yes, it's a struggle to keep up with all the different parallel things... It's good that you keep pushing on this; it helps to keep it's priority high... If you see the discussions, it seems to be a rather larger change we want to do, that does not help to speed things up... :( Not sure if you would we interested to help yourself and do a PR? It might still be slow on getting reviews, but maybe still faster than what we have right now. Maybe you could try to do this refactoring: https://github.com/apache/kafka/pull/14157#discussion_r1509692788 -- Of course, this is all very deep inside KS runtime and requires quite some knowledge, so not sure if you would be willing to ramp up on all this... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15303: Avoid unnecessary re-serialization in FK-join [kafka]
mjsax commented on code in PR #14157: URL: https://github.com/apache/kafka/pull/14157#discussion_r1509692788 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java: ## @@ -44,72 +45,98 @@ * @param Type of foreign values * @param Type of joined result of primary and foreign values */ -public class ResponseJoinProcessorSupplier implements ProcessorSupplier, K, VR> { +public class ResponseJoinProcessorSupplier implements ProcessorSupplier, K, VR> { private static final Logger LOG = LoggerFactory.getLogger(ResponseJoinProcessorSupplier.class); -private final KTableValueGetterSupplier valueGetterSupplier; -private final Serializer constructionTimeValueSerializer; +private final KTableValueGetterSupplier rawValueGetterSupplier; +private final Deserializer keyDeserializer; +private final Deserializer leftValueDeserializer; +private final Deserializer rightValueDeserializer; private final Supplier valueHashSerdePseudoTopicSupplier; private final ValueJoiner joiner; private final boolean leftJoin; -public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier valueGetterSupplier, Review Comment: I did not write this code -- not 100% sure why it was done this way. Maybe it was a case of c of existing patterns. Your point seems to be valid though, that we don't need the getter indirection for the "subscription" store. Might be a good think to split out a refactoring for this part into a single PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15303: Avoid unnecessary re-serialization in FK-join [kafka]
mjsax commented on code in PR #14157: URL: https://github.com/apache/kafka/pull/14157#discussion_r1509687465 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueRawStoreWrapper.java: ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A wrapper class for non-windowed key-value stores used within the DSL. All such stores are + * instances of either {@link TimestampedKeyValueStore} or {@link VersionedKeyValueStore}. + * + * @param The key type + * @param The value type + */ +public class KeyValueRawStoreWrapper implements StateStore { + +public static final long PUT_RETURN_CODE_IS_LATEST += VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED; + +private KeyValueStore timestampedStore = null; +private VersionedKeyValueStore versionedStore = null; + +// same as either timestampedStore or versionedStore above. kept merely as a convenience +// to simplify implementation for methods which do not depend on store type. +private StateStore store = null; + +@SuppressWarnings("unchecked") +public KeyValueRawStoreWrapper(final ProcessorContext context, final String storeName) { +try { +// first try timestamped store +timestampedStore = ((WrappedStateStore, K, V>) ((WrappedStateStore, K, V>) context.getStateStore(storeName)).wrapped()).wrapped(); Review Comment: Just cycling back to this -- not sure what the difference between `RawXxxStore` and `WrappedKeyValueState` would be? (Hope you can still remember... ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
mjsax commented on code in PR #12988: URL: https://github.com/apache/kafka/pull/12988#discussion_r1509672158 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -1140,6 +1145,7 @@ public class StreamsConfig extends AbstractConfig { static { final Map tempProducerDefaultOverrides = new HashMap<>(); tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100"); + tempProducerDefaultOverrides.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "none"); Review Comment: Default is already `null` -- why do we need to set it? ## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ## @@ -530,6 +530,14 @@ public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() { assertThat(consumerConfigs.get("internal.leave.group.on.close"), is(false)); } +@Test +public void shouldResetToDefaultIfConsumerAllowAutoCreateTopicsIsOverridden() { Review Comment: This should apply to all consumers, right? Should we extend the test accordingly? Should we also capture the logs and verify that the WARN is printed (not sure if necessary)? ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -1883,3 +1886,20 @@ public static void main(final String[] args) { System.out.println(CONFIG.toHtml(4, config -> "streamsconfigs_" + config)); } } + + +public Map getMainConsumerConfigs(...) { Review Comment: `StreamsConfig` is public API and we cannot just modify it w/o a KIP. -- Also, why do we need this new method to begin with? We already have `getMainConsumerConfigs(...)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1974072824 @VictorvandenHoven -- it seems `KStreamKStreamIntegrationTest.shouldOuterJoin` fails consistently. Can you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16100: Add timeout to all the CompletableApplicationEvents [kafka]
kirktrue commented on PR #15455: URL: https://github.com/apache/kafka/pull/15455#issuecomment-1974030036 @cadonna—can you review this PR as part of the larger timeout handling refactoring? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16100: Add timeout to all the CompletableApplicationEvents [kafka]
kirktrue opened a new pull request, #15455: URL: https://github.com/apache/kafka/pull/15455 This is part of the larger task of enforcing the timeouts for application events, per KAFKA-15974. This takes a first step by adding a `Timer` to all of the `CompletableApplicationEvent` subclasses. For the few classes that already included a timeout, this refactors them to use the `Timer` approach instead. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add 3.7 to Kafka Streams system tests [kafka]
mjsax commented on PR #15443: URL: https://github.com/apache/kafka/pull/15443#issuecomment-1973983318 Different set of system test (or same but different parameters) fails on second run. Seems it's flaky test that we will need to stabilize. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16190: Member should send full heartbeat when rejoining [kafka]
phong260702 commented on code in PR #15401: URL: https://github.com/apache/kafka/pull/15401#discussion_r1509566835 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -524,10 +524,10 @@ public void testHeartbeatState() { assertEquals(memberId, data.memberId()); assertEquals(0, data.memberEpoch()); assertNull(data.instanceId()); -assertEquals(-1, data.rebalanceTimeoutMs()); Review Comment: I've add the test but not so sure if this is what it suppose to be can you check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16190: Member should send full heartbeat when rejoining [kafka]
phong260702 commented on code in PR #15401: URL: https://github.com/apache/kafka/pull/15401#discussion_r1509566281 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -545,6 +545,15 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); +// Sent all fields if the member is joining/rejoining the group +if (membershipManager.state() == MemberState.JOINING) { Review Comment: Thanks for the suggestion, I've changed it you take a look at it now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15490) Invalid path provided to the log failure channel upon I/O error when writing broker metadata checkpoint
[ https://issues.apache.org/jira/browse/KAFKA-15490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexandre Dupriez updated KAFKA-15490: -- Affects Version/s: 3.4.1 > Invalid path provided to the log failure channel upon I/O error when writing > broker metadata checkpoint > --- > > Key: KAFKA-15490 > URL: https://issues.apache.org/jira/browse/KAFKA-15490 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.4.0, 3.4.1, 3.5.1 >Reporter: Alexandre Dupriez >Priority: Minor > > There is a small bug/typo in the handling of I/O error when writing broker > metadata checkpoint in {{{}KafkaServer{}}}. The path provided to the log dir > failure channel is the full path of the checkpoint file whereas only the log > directory is expected > ([source|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/server/KafkaServer.scala#L958C8-L961C8]). > {code:java} > case e: IOException => >val dirPath = checkpoint.file.getAbsolutePath >logDirFailureChannel.maybeAddOfflineLogDir(dirPath, s"Error while writing > meta.properties to $dirPath", e){code} > As a result, after an {{IOException}} is captured and enqueued in the log dir > failure channel ({{{}{}}} is to be replaced with the actual path of > the log directory): > {code:java} > [2023-09-22 17:07:32,052] ERROR Error while writing meta.properties to > /meta.properties (kafka.server.LogDirFailureChannel) > java.io.IOException{code} > The log dir failure handler cannot lookup the log directory: > {code:java} > [2023-09-22 17:07:32,053] ERROR [LogDirFailureHandler]: Error due to > (kafka.server.ReplicaManager$LogDirFailureHandler) > org.apache.kafka.common.errors.LogDirNotFoundException: Log dir > /meta.properties is not found in the config.{code} > An immediate fix for this is to use the {{logDir}} provided from to the > checkpointing method instead of the path of the metadata file. > For brokers with only one log directory, this bug will result in preventing > the broker from shutting down as expected. > The L{{{}ogDirNotFoundException{}}} then kills the log dir failure handler > thread, and subsequent {{IOException}} are not handled, and the broker never > stops. > {code:java} > [2024-02-27 02:13:13,564] INFO [LogDirFailureHandler]: Stopped > (kafka.server.ReplicaManager$LogDirFailureHandler){code} > Another consideration here is whether the {{LogDirNotFoundException}} should > terminate the log dir failure handler thread. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15490) Invalid path provided to the log failure channel upon I/O error when writing broker metadata checkpoint
[ https://issues.apache.org/jira/browse/KAFKA-15490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexandre Dupriez updated KAFKA-15490: -- Affects Version/s: 3.4.0 > Invalid path provided to the log failure channel upon I/O error when writing > broker metadata checkpoint > --- > > Key: KAFKA-15490 > URL: https://issues.apache.org/jira/browse/KAFKA-15490 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.4.0, 3.5.1 >Reporter: Alexandre Dupriez >Priority: Minor > > There is a small bug/typo in the handling of I/O error when writing broker > metadata checkpoint in {{{}KafkaServer{}}}. The path provided to the log dir > failure channel is the full path of the checkpoint file whereas only the log > directory is expected > ([source|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/server/KafkaServer.scala#L958C8-L961C8]). > {code:java} > case e: IOException => >val dirPath = checkpoint.file.getAbsolutePath >logDirFailureChannel.maybeAddOfflineLogDir(dirPath, s"Error while writing > meta.properties to $dirPath", e){code} > As a result, after an {{IOException}} is captured and enqueued in the log dir > failure channel ({{{}{}}} is to be replaced with the actual path of > the log directory): > {code:java} > [2023-09-22 17:07:32,052] ERROR Error while writing meta.properties to > /meta.properties (kafka.server.LogDirFailureChannel) > java.io.IOException{code} > The log dir failure handler cannot lookup the log directory: > {code:java} > [2023-09-22 17:07:32,053] ERROR [LogDirFailureHandler]: Error due to > (kafka.server.ReplicaManager$LogDirFailureHandler) > org.apache.kafka.common.errors.LogDirNotFoundException: Log dir > /meta.properties is not found in the config.{code} > An immediate fix for this is to use the {{logDir}} provided from to the > checkpointing method instead of the path of the metadata file. > For brokers with only one log directory, this bug will result in preventing > the broker from shutting down as expected. > The L{{{}ogDirNotFoundException{}}} then kills the log dir failure handler > thread, and subsequent {{IOException}} are not handled, and the broker never > stops. > {code:java} > [2024-02-27 02:13:13,564] INFO [LogDirFailureHandler]: Stopped > (kafka.server.ReplicaManager$LogDirFailureHandler){code} > Another consideration here is whether the {{LogDirNotFoundException}} should > terminate the log dir failure handler thread. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16148: Implement GroupMetadataManager#onUnloaded [kafka]
jeffkbkim commented on code in PR #15446: URL: https://github.com/apache/kafka/pull/15446#discussion_r1509457561 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1830,6 +1832,63 @@ public void onLoaded() { }); } +/** + * Called when the partition is unloaded. Cancel all existing timers for the group. + * ClassicGroup: Complete all awaiting join futures and sync futures. Transition group to Dead. + */ +public void onUnloaded() { +groups.values().forEach(group -> { +switch (group.type()) { +case CONSUMER: +ConsumerGroup consumerGroup = (ConsumerGroup) group; +log.info("[GroupId={}] Unloading group metadata for group epoch {}.", +consumerGroup.groupId(), consumerGroup.groupEpoch()); + +consumerGroup.members().values().forEach(member -> { + timer.cancel(consumerGroupSessionTimeoutKey(consumerGroup.groupId(), member.memberId())); + timer.cancel(consumerGroupRevocationTimeoutKey(consumerGroup.groupId(), member.memberId())); Review Comment: then we should expect classic group timers to also be cancelled already right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16148: Implement GroupMetadataManager#onUnloaded [kafka]
jeffkbkim commented on code in PR #15446: URL: https://github.com/apache/kafka/pull/15446#discussion_r1509452557 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1830,6 +1832,63 @@ public void onLoaded() { }); } +/** + * Called when the partition is unloaded. Cancel all existing timers for the group. + * ClassicGroup: Complete all awaiting join futures and sync futures. Transition group to Dead. + */ +public void onUnloaded() { +groups.values().forEach(group -> { +switch (group.type()) { +case CONSUMER: +ConsumerGroup consumerGroup = (ConsumerGroup) group; +log.info("[GroupId={}] Unloading group metadata for group epoch {}.", +consumerGroup.groupId(), consumerGroup.groupEpoch()); + +consumerGroup.members().values().forEach(member -> { + timer.cancel(consumerGroupSessionTimeoutKey(consumerGroup.groupId(), member.memberId())); + timer.cancel(consumerGroupRevocationTimeoutKey(consumerGroup.groupId(), member.memberId())); +}); + +break; +case CLASSIC: +ClassicGroup classicGroup = (ClassicGroup) group; +log.info("[GroupId={}] Unloading group metadata for generation {}.", +classicGroup.groupId(), classicGroup.generationId()); + +classicGroup.transitionTo(DEAD); +switch (classicGroup.previousState()) { +case EMPTY: +case DEAD: +break; +case PREPARING_REBALANCE: +classicGroup.allMembers().forEach(member -> { +classicGroup.completeJoinFuture(member, new JoinGroupResponseData() +.setMemberId(member.memberId()) +.setMembers(Collections.emptyList()) +.setGenerationId(NO_GENERATION) Review Comment: will remove them. sorry for the confusion, we don't need https://issues.apache.org/jira/browse/KAFKA-16299. i don't know where i saw the default values but i remember they were set as 0 and null.. removed all NO_GENERATION and NO_PROTOCOL_NAME usages as they are the defaults -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15878) KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER
[ https://issues.apache.org/jira/browse/KAFKA-15878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar reassigned KAFKA-15878: - Assignee: Anuj Sharma > KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER > > > Key: KAFKA-15878 > URL: https://issues.apache.org/jira/browse/KAFKA-15878 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Anuj Sharma >Assignee: Anuj Sharma >Priority: Major > Labels: oauth > Fix For: 3.8.0 > > > {code:java} > // code placeholder > {code} > h1. Overview > * This issue pertains to > [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] > mechanism of Kafka authentication. > * Kafka clients can use [SASL/OAUTHBEARER > |https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism > by overriding the [custom call back > handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod] > . > * > [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] > available from v3.1 further extends the mechanism with a production grade > implementation. > * Kafka's > [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] > mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is > because of a more restrictive set of characters than what > [RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] > recommends. > * This JIRA can be considered an extension of > [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] > to support the opaque tokens as well apart from the JWT tokens. > > In summary the following character set should be supported as per the RFC - > {code:java} > 1*( ALPHA / DIGIT / >"-" / "." / "_" / "~" / "+" / "/" ) *"=" > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15878: KIP-768 - Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER [kafka]
omkreddy merged PR #14818: URL: https://github.com/apache/kafka/pull/14818 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15878: KIP-768 - Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER [kafka]
philomathanuj commented on PR #14818: URL: https://github.com/apache/kafka/pull/14818#issuecomment-1973769981 Thanks very much @kirktrue, @omkreddy for reviewing the PR. Could you help with merging the PR please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16152: Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart [kafka]
mjsax commented on code in PR #15419: URL: https://github.com/apache/kafka/pull/15419#discussion_r1509341591 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -546,8 +546,9 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { data.setMemberEpoch(membershipManager.memberEpoch()); // InstanceId - only sent if has changed since the last heartbeat Review Comment: I think "setting the id all the time" vs "omitting it" is an important but orthogonal question. -- The comment say that it must be set "if has changed", but it should never change, right? > So the comment in the schema was wrong Is this about setting the id vs not setting it, or about the original question if it could change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16320) CreateTopics, DeleteTopics and CreatePartitions differences between Zookeeper and KRaft
Emanuele Sabellico created KAFKA-16320: -- Summary: CreateTopics, DeleteTopics and CreatePartitions differences between Zookeeper and KRaft Key: KAFKA-16320 URL: https://issues.apache.org/jira/browse/KAFKA-16320 Project: Kafka Issue Type: Bug Affects Versions: 3.7.0 Reporter: Emanuele Sabellico Test number 0081 with these operations is failing in librdkafka when using KRaft but not when using Zookeeper. The test sets the operation timeout to 0 and expects that those operations are executed asynchronously. The returned err was REQUEST_TIMED_OUT and it was converted to NO_ERROR if operation timeout is <= 0. With KRaft instead NO_ERROR is returned, but the topics aren't created or deleted. Also passing an invalid configuration option it's returning NO_ERROR instead of INVALID_CONFIG, that is what happens in Zookeeper or with KRaft if operation timeout is > 0. https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L5174C9-L5174C29 {code:java} /* For non-blocking CreateTopicsRequests the broker * will returned REQUEST_TIMED_OUT for topics * that were triggered for creation - * we hide this error code from the application * since the topic creation is in fact in progress. */ if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT && rd_kafka_confval_get_int(_req->rko_u.admin_request .options.operation_timeout) <= 0) { error_code = RD_KAFKA_RESP_ERR_NO_ERROR; this_errstr = NULL; } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Add Deletion Reasons in KRaft Snapshot Deletion Logging [kafka]
jsancio commented on code in PR #15450: URL: https://github.com/apache/kafka/pull/15450#discussion_r1509262172 ## core/src/main/scala/kafka/raft/KafkaMetadataLog.scala: ## @@ -179,7 +180,7 @@ final class KafkaMetadataLog private ( (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) } -removeSnapshots(forgottenSnapshots) +removeSnapshots(forgottenSnapshots,RetentionMsBreach()) Review Comment: Missing space between `,` and `RetentionMsBreach`. ## core/src/main/scala/kafka/raft/KafkaMetadataLog.scala: ## @@ -501,6 +524,24 @@ final class KafkaMetadataLog private ( } } + private def removeSnapshots( +expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]], +reason: SnapshotDeletionReason, + ): Unit = { +expiredSnapshots.foreach { case (snapshotId, _) => + reason.logReason(snapshotId) + Snapshots.markForDelete(log.dir.toPath, snapshotId) +} + +if (expiredSnapshots.nonEmpty) { + scheduler.scheduleOnce( +"delete-snapshot-files", +() => KafkaMetadataLog.deleteSnapshotFiles(log.dir.toPath, expiredSnapshots, this), +config.fileDeleteDelayMs + ) +} + } Review Comment: This looks like a duplicate of the method `removeSnapshots(mutable.TreeMap[_, _])`. ## core/src/main/scala/kafka/raft/KafkaMetadataLog.scala: ## @@ -677,4 +692,20 @@ object KafkaMetadataLog extends Logging { Snapshots.deleteIfExists(logDir, snapshotId) } } + + trait SnapshotDeletionReason { +def logReason(snapshotId: OffsetAndEpoch): Unit + } + + final case class RetentionMsBreach() extends SnapshotDeletionReason { +override def logReason(snapshotId: OffsetAndEpoch): Unit = { + info(s"Marking snapshot $snapshotId for deletion because the age is too old") +} + } + + final case class RetentionSizeBreach() extends SnapshotDeletionReason { +override def logReason(snapshotId: OffsetAndEpoch): Unit = { + info(s"Marking snapshot $snapshotId for deletion because the size is too big") +} Review Comment: Similar comment here. The reason should include this information: `log.size + snapshotTotalSize > config.retentionMaxBytes` From: https://github.com/apache/kafka/pull/15450/files#diff-b332f85b04775c821226b6f704e91d51f9647f29ba73dace65b99cf36f6b9ceaR477 ## core/src/main/scala/kafka/raft/KafkaMetadataLog.scala: ## @@ -484,9 +506,10 @@ final class KafkaMetadataLog private ( * Rename the given snapshots on the log directory. Asynchronously, close and delete the * given snapshots after some delay. */ + private def removeSnapshots( -expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] - ): Unit = { + expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]], + ): Unit = { Review Comment: This indentation doesn't look correct. I think you want to revert this change. ## core/src/main/scala/kafka/raft/KafkaMetadataLog.scala: ## @@ -359,6 +360,27 @@ final class KafkaMetadataLog private ( deleted } + def deleteBeforeSnapshot(snapshotId: OffsetAndEpoch,reason: SnapshotDeletionReason): Boolean = { +val (deleted, forgottenSnapshots) = snapshots synchronized { + latestSnapshotId().asScala match { +case Some(latestSnapshotId) if + snapshots.contains(snapshotId) && +startOffset < snapshotId.offset && +snapshotId.offset <= latestSnapshotId.offset && +log.maybeIncrementLogStartOffset(snapshotId.offset, LogStartOffsetIncrementReason.SnapshotGenerated) => + // Delete all segments that have a "last offset" less than the log start offset + val deletedSegments = log.deleteOldSegments() + // Remove older snapshots from the snapshots cache + val forgottenSnapshots = forgetSnapshotsBefore(snapshotId) + (deletedSegments != 0 || forgottenSnapshots.nonEmpty, forgottenSnapshots) +case _ => + (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) + } +} +removeSnapshots(forgottenSnapshots, reason) +deleted + } Review Comment: This looks like a duplicate of `deleteBeforeSnapshot(OffsetAndEpoch)`. If tests are calling this method directly, lets add an "unknown" reason and the test can use that when deleting snapshots. You should be able to avoid this code duplication with this change: ```scala override def deleteBeforeSnapshot(snapshotId: OffsetAndEpoch): Boolean = { deleteBeforeSnapshot(snapshotId, UnknownReason()) } ``` ## core/src/main/scala/kafka/raft/KafkaMetadataLog.scala: ## @@ -348,9 +349,9 @@ final class KafkaMetadataLog private ( snapshotId.offset <= latestSnapshotId.offset && log.maybeIncrementLogStartOffset(snapshotId.offset,
Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]
philipnee commented on code in PR #15437: URL: https://github.com/apache/kafka/pull/15437#discussion_r1509285840 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -567,6 +568,28 @@ public void testCommitAsyncLeaderEpochUpdate() { verify(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class)); } +@Test +public void testCommitAsyncPropagatesFencedException() { Review Comment: `poll`, `commitSync`, `commitAsync` should all throw FencedIdException. Can you test each of these API will throw the correct exception after the instance is being fenced? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -792,8 +801,8 @@ public void commitAsync(Map offsets, OffsetCo } private CompletableFuture commit(final CommitEvent commitEvent) { -maybeInvokeCommitCallbacks(); maybeThrowFencedInstanceException(); +maybeInvokeCommitCallbacks(); Review Comment: the change in order perhaps reflects to this snippet the ConsumerCoordinator: ``` if (asyncCommitFenced.get()) { throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + rebalanceConfig.groupInstanceId.orElse("unset_instance_id") + ", current member.id is " + memberId()); } while (true) { OffsetCommitCompletion completion = completedOffsetCommits.poll(); if (completion == null) { break; } completion.invoke(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16152: Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart [kafka]
dajac commented on code in PR #15419: URL: https://github.com/apache/kafka/pull/15419#discussion_r1509278562 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -546,8 +546,9 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { data.setMemberEpoch(membershipManager.memberEpoch()); // InstanceId - only sent if has changed since the last heartbeat Review Comment: The current implementation actually requires the static member id to be set all the time if the consumer uses the static membership. So the comment in the schema was wrong. I need to go back to the implementation to see whether we could relax it and only require it in the first request, when joining. I will check and let you guys know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]
philipnee commented on PR #15437: URL: https://github.com/apache/kafka/pull/15437#issuecomment-1973537076 Hey thanks for the PR - I notice a subtle thing here: Seems like we never null check interceptors in the async consumer. Can `interceptors` ever be null? ``` try { Timer requestTimer = time.timer(timeout.toMillis()); // Commit with a timer to control how long the request should be retried until it // gets a successful response or non-retriable error. CompletableFuture commitFuture = commit(offsets, true, Optional.of(timeout.toMillis())); ConsumerUtils.getResult(commitFuture, requestTimer); -> interceptors.onCommit(offsets); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: AddPartitionsToTxnManager performance optimizations [kafka]
splett2 opened a new pull request, #15454: URL: https://github.com/apache/kafka/pull/15454 A few minor optimizations: 1. Cache the interbroker listener name instead of computing it each time. The value of the interbroker listener name cannot change without a process restart. 2. we're currently grabbing all partitions for the transaction state topic in getTransactionCoordinator. Instead, just query the partition we care about. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16319) Wrong broker destinations for DeleteRecords requests when more than one topic is involved and the topics/partitions are led by different brokers
AlexeyASF created KAFKA-16319: - Summary: Wrong broker destinations for DeleteRecords requests when more than one topic is involved and the topics/partitions are led by different brokers Key: KAFKA-16319 URL: https://issues.apache.org/jira/browse/KAFKA-16319 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.6.1 Reporter: AlexeyASF h2. Context Kafka streams applications send, time after time, {{DeleteRecords}} requests, via {{org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords}} method. Such requests may involve more than 1 topic (or partition), and such requests are supposed to be sent to partitions' leaders brokers. h2. Observed behaviour In case when {{DeleteRecords}} request includes more than one topic (let's say 2 - {{topic1}} and {{{}topic2{}}}), and these topics are led by different brokers (let’s say {{broker1}} and {{broker2}} respectively), the request is sent to only one broker (let’s say {{{}broker1{}}}), leading to partial not_leader_or_follower errors. As not the whole request was successful ({{{}topic1{}}} is fine, but {{topic2}} is not), it gets retried, with the _same_ arguments, to the _same_ broker ({{{}broker1{}}}), meaning the response will be partially faulty again and again. It also may (and does) happen that there is a “mirrored” half-faulty request - in this case, to {{{}broker2{}}}, where {{topic2}} operation is successful, but {{topic1}} operation fails. Here’s an anonymised logs example from a production system (“direct” and “mirrored” requests, one after another): {code:java} [AdminClient clientId=worker-admin] Sending DeleteRecordsRequestData(topics=[ DeleteRecordsTopic( name='topic1', partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] ), DeleteRecordsTopic( name='topic2', partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] )], timeoutMs=6) to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1 correlationId=42003907, timeoutMs=3 [AdminClient clientId=worker-admin] Sending DeleteRecordsRequestData(topics=[ DeleteRecordsTopic( name='topic1', partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] ), DeleteRecordsTopic( name='topic2', partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] )], timeoutMs=6) to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2 correlationId=42003906, timeoutMs=3 {code} Such request results in the following response (in this case, only for the "direct" response): {code:java} [AdminClient clientId=worker-admin] Call( callName=deleteRecords(api=DELETE_RECORDS), deadlineMs=..., tries=..., // Can be hundreds nextAllowedTryMs=...) got response DeleteRecordsResponseData( throttleTimeMs=0, topics=[ DeleteRecordsTopicResult( name='topic2', partitions=[DeleteRecordsPartitionResult( partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the errorCode 6, which is not_leader_or_follower DeleteRecordsTopicResult( name='topic1', partitions=[DeleteRecordsPartitionResult( partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the errorCode 0, which means the operation was successful ] ) {code} h2. Expected behaviour {{DeleteRecords}} requests are sent to corresponding partitions' leaders brokers when more than 1 topic/partition is involved and they are led by different brokers. h2. Notes * {_}presumably{_}, introduced in 3.6.1 via [https://github.com/apache/kafka/pull/13760] . -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16261: updateSubscription fails if already empty subscription [kafka]
lucasbru merged PR #15440: URL: https://github.com/apache/kafka/pull/15440 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15303: Avoid unnecessary re-serialization in FK-join [kafka]
CharlyRien commented on PR #14157: URL: https://github.com/apache/kafka/pull/14157#issuecomment-1973401741 I know that I probably sound pushy, but I’m curious if there’s anything we can do to continue the work you initiated, @mjsax. I understand you don't have a lot of time and other priorities and that's perfectly fine. However, on our side, we have a costly KStream deployment process (blue/green deployment to avoid disrupting the live flow of events) and we are forced to use it each time we add a new field because of this problem. (even though our topology has not changed at all between versions.) Thank you nonetheless for your time! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15878) KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER
[ https://issues.apache.org/jira/browse/KAFKA-15878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822617#comment-17822617 ] Anuj Sharma commented on KAFKA-15878: - [~kirktrue] - thanks so much for assigning the ticket to yourself in the meanwhile. For some reason I don't have access to assign the ticket to myself. I have raised a ticket on apache infra to get this sorted (Though I am not sure if it's the best place to address my query). > KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER > > > Key: KAFKA-15878 > URL: https://issues.apache.org/jira/browse/KAFKA-15878 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Anuj Sharma >Assignee: Kirk True >Priority: Major > Labels: oauth > Fix For: 3.8.0 > > > {code:java} > // code placeholder > {code} > h1. Overview > * This issue pertains to > [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] > mechanism of Kafka authentication. > * Kafka clients can use [SASL/OAUTHBEARER > |https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism > by overriding the [custom call back > handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod] > . > * > [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] > available from v3.1 further extends the mechanism with a production grade > implementation. > * Kafka's > [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] > mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is > because of a more restrictive set of characters than what > [RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] > recommends. > * This JIRA can be considered an extension of > [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] > to support the opaque tokens as well apart from the JWT tokens. > > In summary the following character set should be supported as per the RFC - > {code:java} > 1*( ALPHA / DIGIT / >"-" / "." / "_" / "~" / "+" / "/" ) *"=" > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15931: Reopen TransactionIndex if channel is closed [kafka]
nikramakrishnan commented on code in PR #15241: URL: https://github.com/apache/kafka/pull/15241#discussion_r1509082067 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -522,6 +522,18 @@ public TimeIndex timeIndex() { // Visible for testing public TransactionIndex txnIndex() { +if (txnIndex.isClosed()) { Review Comment: Nit: Maybe it makes sense to add a debug log here to indicate that the index was closed and is being reopened? Might be useful to debug any performance issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15931: Reopen TransactionIndex if channel is closed [kafka]
nikramakrishnan commented on code in PR #15241: URL: https://github.com/apache/kafka/pull/15241#discussion_r1509082067 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -522,6 +522,18 @@ public TimeIndex timeIndex() { // Visible for testing public TransactionIndex txnIndex() { +if (txnIndex.isClosed()) { Review Comment: Nit: Maybe it makes sense to add a debug log here to indicate that the log was closed and is being reopened? Might be useful to debug any performance issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add version 3.7 to the Kafka Streams system tests [kafka]
gaurav-narula commented on PR #15453: URL: https://github.com/apache/kafka/pull/15453#issuecomment-1973286061 Seems like a duplicate of https://github.com/apache/kafka/pull/15443 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add 3.7.0 to core and client's upgrade compatibility tests [kafka]
stanislavkozlovski commented on PR #15452: URL: https://github.com/apache/kafka/pull/15452#issuecomment-1973269982 tests are run at: - https://jenkins.confluent.io/job/system-test-kafka-branch-builder/6084/ - https://jenkins.confluent.io/job/system-test-kafka-branch-builder/6085/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add version 3.7 to the Kafka Streams system tests [kafka]
stanislavkozlovski commented on PR #15453: URL: https://github.com/apache/kafka/pull/15453#issuecomment-1973267141 - system test run is at https://jenkins.confluent.io/job/system-test-kafka-branch-builder/6082/ - i simply copy-pasted the files from the 36 folder and re-named the 3 log occurrences from `3.6` -> `3.7` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Add version 3.7 to the Kafka Streams system tests [kafka]
stanislavkozlovski opened a new pull request, #15453: URL: https://github.com/apache/kafka/pull/15453 This patch adds the 3.7 version to the Kafka Streams system tests as per the release wiki and past examples -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: update kraft_upgrade_test to create a new topic after metadata upgrade [kafka]
gaurav-narula commented on code in PR #15451: URL: https://github.com/apache/kafka/pull/15451#discussion_r1509056087 ## tests/kafkatest/tests/core/kraft_upgrade_test.py: ## @@ -108,6 +108,27 @@ def run_upgrade(self, from_kafka_version): assert len(cluster_id) == 22 assert self.kafka.check_protocol_errors(self) +# Ensure we can create another topic and produce/consume to/from it +new_topic_cfg = { +"topic": "test-topic-2", +"partitions": self.partitions, +"replication-factor": self.replication_factor, +"configs": {"min.insync.replicas": 2} Review Comment: Addressed in https://github.com/apache/kafka/pull/15451/commits/6776bdf9751198294de2f8c4cf7c97351f34f97a -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
soarez commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1509052378 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2859,10 +2867,10 @@ class ReplicaManager(val config: KafkaConfig, "local leaders.") replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet) localLeaders.forKeyValue { (tp, info) => - getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) => + val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) + getOrCreatePartition(tp, delta, info.topicId, isLocalFollower = false, partitionAssignedDirectoryId).foreach { case (partition, isNew) => Review Comment: > `isLocalFollower = false` I don't think this is ok. Setting this to false disables the fix when the broker is the leader. Which can happen in RF=1 or with RF=N when the broker comes back in the ISR (e.g. because there were no new messages, and the other replicas are all shutdown). I had a conversation with @gaurav-narula about this, and the plan is to apply this fix earlier when the logs are being loaded. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Add 3.7.0 to core and client's upgrade compatibility tests [kafka]
stanislavkozlovski opened a new pull request, #15452: URL: https://github.com/apache/kafka/pull/15452 Since Kafka 3.7.0 was just released, this patch extends its upgrade and compatibility tests to test that version as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: update kraft_upgrade_test to create a new topic after metadata upgrade [kafka]
soarez commented on code in PR #15451: URL: https://github.com/apache/kafka/pull/15451#discussion_r1509037381 ## tests/kafkatest/tests/core/kraft_upgrade_test.py: ## @@ -108,6 +108,27 @@ def run_upgrade(self, from_kafka_version): assert len(cluster_id) == 22 assert self.kafka.check_protocol_errors(self) +# Ensure we can create another topic and produce/consume to/from it +new_topic_cfg = { +"topic": "test-topic-2", +"partitions": self.partitions, +"replication-factor": self.replication_factor, +"configs": {"min.insync.replicas": 2} Review Comment: Don't we want to consider `self.replication_factor` here? Maybe `min(2, self.replication_factor)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16031) Enabling testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory for tiered storage after supporting JBOD
[ https://issues.apache.org/jira/browse/KAFKA-16031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822577#comment-17822577 ] PoAn Yang commented on KAFKA-16031: --- It's ok. I just tried to enable it and run the test and I got following error: {code:java} org.apache.kafka.common.errors.InvalidReplicaDirectoriesException: The lengths for replicas and directories do not match: PartitionRecord(partitionId=0, topicId=fFJBx0OmQG-UqeaT6YaSwA, replicas=[1, 2], isr=[1, 2], removingReplicas=[], addingReplicas=[], leader=1, leaderRecoveryState=0, leaderEpoch=0, partitionEpoch=0, directories=[Nbi0tSXYT-S8rQY1EtepKA], eligibleLeaderReplicas=null, lastKnownElr=null) {code} May I know which issues I can track to know whether it's enable? Thanks. > Enabling testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory for > tiered storage after supporting JBOD > > > Key: KAFKA-16031 > URL: https://issues.apache.org/jira/browse/KAFKA-16031 > Project: Kafka > Issue Type: Test > Components: Tiered-Storage >Reporter: Luke Chen >Assignee: PoAn Yang >Priority: Major > > Currently, tiered storage doesn't support JBOD (multiple log dirs). The test > testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory requires multiple > log dirs to run it. We should enable it for tiered storage after supporting > JBOD in tiered storage. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: update kraft_upgrade_test to create a new topic after metadata upgrade [kafka]
gaurav-narula opened a new pull request, #15451: URL: https://github.com/apache/kafka/pull/15451 Updates kraft_upgrade_test to create a new topic after upgrading metadata version and tries to produce/consume to/from it. This may act as a regression test for scenarios like KAFKA-16162 where newly created topics are not assigned a leader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: update kraft_upgrade_test to create a new topic after metadata upgrade [kafka]
gaurav-narula commented on PR #15451: URL: https://github.com/apache/kafka/pull/15451#issuecomment-1973222033 CC: @soarez @pprovenzano @showuon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16031) Enabling testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory for tiered storage after supporting JBOD
[ https://issues.apache.org/jira/browse/KAFKA-16031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822553#comment-17822553 ] Luke Chen commented on KAFKA-16031: --- Sorry [~yangpoan], had another look, this feature is still not supported, yet. So we should not enable it now. > Enabling testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory for > tiered storage after supporting JBOD > > > Key: KAFKA-16031 > URL: https://issues.apache.org/jira/browse/KAFKA-16031 > Project: Kafka > Issue Type: Test > Components: Tiered-Storage >Reporter: Luke Chen >Assignee: PoAn Yang >Priority: Major > > Currently, tiered storage doesn't support JBOD (multiple log dirs). The test > testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory requires multiple > log dirs to run it. We should enable it for tiered storage after supporting > JBOD in tiered storage. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16031) Enabling testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory for tiered storage after supporting JBOD
[ https://issues.apache.org/jira/browse/KAFKA-16031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822541#comment-17822541 ] Luke Chen commented on KAFKA-16031: --- Go ahead. Thanks. > Enabling testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory for > tiered storage after supporting JBOD > > > Key: KAFKA-16031 > URL: https://issues.apache.org/jira/browse/KAFKA-16031 > Project: Kafka > Issue Type: Test > Components: Tiered-Storage >Reporter: Luke Chen >Assignee: PoAn Yang >Priority: Major > > Currently, tiered storage doesn't support JBOD (multiple log dirs). The test > testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory requires multiple > log dirs to run it. We should enable it for tiered storage after supporting > JBOD in tiered storage. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16222) KRaft Migration: Incorrect default user-principal quota after migration
[ https://issues.apache.org/jira/browse/KAFKA-16222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dominik updated KAFKA-16222: Summary: KRaft Migration: Incorrect default user-principal quota after migration (was: Incorrect default user-principal quota after migration) > KRaft Migration: Incorrect default user-principal quota after migration > --- > > Key: KAFKA-16222 > URL: https://issues.apache.org/jira/browse/KAFKA-16222 > Project: Kafka > Issue Type: Bug > Components: kraft, migration >Affects Versions: 3.7.0, 3.6.1 >Reporter: Dominik >Priority: Blocker > > We observed that our default user quota seems not to be migrated correctly. > Before Migration: > bin/kafka-configs.sh --describe --all --entity-type users > Quota configs for the *default user-principal* are > consumer_byte_rate=100.0, producer_byte_rate=100.0 > Quota configs for user-principal {color:#172b4d}'myuser{*}@{*}prod'{color} > are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8 > After Migration: > bin/kafka-configs.sh --describe --all --entity-type users > Quota configs for *user-principal ''* are consumer_byte_rate=100.0, > producer_byte_rate=100.0 > Quota configs for user-principal {color:#172b4d}'myuser{*}%40{*}prod'{color} > are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8 > > Additional finding: Our names contains a "@" which also lead to incorrect > after migration state. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16232) kafka hangs forever in the starting process if the authorizer future is not returned
[ https://issues.apache.org/jira/browse/KAFKA-16232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822532#comment-17822532 ] Kuan-Po Tseng commented on KAFKA-16232: --- gentle ping [~showuon] ~ I'm willing to take over this issue, as you mentioned, I'll try to start with (2) first. Many thanks :) > kafka hangs forever in the starting process if the authorizer future is not > returned > > > Key: KAFKA-16232 > URL: https://issues.apache.org/jira/browse/KAFKA-16232 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.1 >Reporter: Luke Chen >Priority: Major > > For security reason, during broker startup, we will wait until all ACL > entries loaded before starting serving requests. But recently, we > accidentally set standardAuthorizer to ZK broker, and then, the broker never > enters RUNNING state because it's waiting for the standardAuthorizer future > completion. Of course this is a human error to set the wrong configuration, > but it'd be better we could handle this case better. Suggestions: > 1. set timeout for authorizer future waiting (how long is long enough?) > 2. add logs before and after future waiting, to allow admin to know we're > waiting for the authorizer future. > We can start with (2), and thinking about (1) later. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16031) Enabling testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory for tiered storage after supporting JBOD
[ https://issues.apache.org/jira/browse/KAFKA-16031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16031: -- Assignee: PoAn Yang > Enabling testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory for > tiered storage after supporting JBOD > > > Key: KAFKA-16031 > URL: https://issues.apache.org/jira/browse/KAFKA-16031 > Project: Kafka > Issue Type: Test > Components: Tiered-Storage >Reporter: Luke Chen >Assignee: PoAn Yang >Priority: Major > > Currently, tiered storage doesn't support JBOD (multiple log dirs). The test > testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory requires multiple > log dirs to run it. We should enable it for tiered storage after supporting > JBOD in tiered storage. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16031) Enabling testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory for tiered storage after supporting JBOD
[ https://issues.apache.org/jira/browse/KAFKA-16031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822525#comment-17822525 ] PoAn Yang commented on KAFKA-16031: --- Hi [~showuon], may I take this? Thank you. > Enabling testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory for > tiered storage after supporting JBOD > > > Key: KAFKA-16031 > URL: https://issues.apache.org/jira/browse/KAFKA-16031 > Project: Kafka > Issue Type: Test > Components: Tiered-Storage >Reporter: Luke Chen >Priority: Major > > Currently, tiered storage doesn't support JBOD (multiple log dirs). The test > testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory requires multiple > log dirs to run it. We should enable it for tiered storage after supporting > JBOD in tiered storage. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16071) NPE in testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress
[ https://issues.apache.org/jira/browse/KAFKA-16071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16071. --- Resolution: Fixed > NPE in testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress > > > Key: KAFKA-16071 > URL: https://issues.apache.org/jira/browse/KAFKA-16071 > Project: Kafka > Issue Type: Test >Reporter: Luke Chen >Priority: Major > Labels: newbie, newbie++ > > Found in the CI build result. > > h3. Error Message > java.lang.NullPointerException > h3. Stacktrace > java.lang.NullPointerException at > org.apache.kafka.tools.TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(TopicCommandIntegrationTest.java:800) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15095/1/testReport/junit/org.apache.kafka.tools/TopicCommandIntegrationTest/Build___JDK_8_and_Scala_2_12___testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress_String__zk/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16071) NPE in testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress
[ https://issues.apache.org/jira/browse/KAFKA-16071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822524#comment-17822524 ] Luke Chen commented on KAFKA-16071: --- Closing it now since it could have been fixed by KAFKA-15140. > NPE in testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress > > > Key: KAFKA-16071 > URL: https://issues.apache.org/jira/browse/KAFKA-16071 > Project: Kafka > Issue Type: Test >Reporter: Luke Chen >Priority: Major > Labels: newbie, newbie++ > > Found in the CI build result. > > h3. Error Message > java.lang.NullPointerException > h3. Stacktrace > java.lang.NullPointerException at > org.apache.kafka.tools.TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(TopicCommandIntegrationTest.java:800) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15095/1/testReport/junit/org.apache.kafka.tools/TopicCommandIntegrationTest/Build___JDK_8_and_Scala_2_12___testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress_String__zk/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16318) Add javadoc to KafkaMetric
Mickael Maison created KAFKA-16318: -- Summary: Add javadoc to KafkaMetric Key: KAFKA-16318 URL: https://issues.apache.org/jira/browse/KAFKA-16318 Project: Kafka Issue Type: Bug Components: docs Reporter: Mickael Maison KafkaMetric is part of the public API but it's missing javadoc describing the class and several of its methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]
lucasbru merged PR #15445: URL: https://github.com/apache/kafka/pull/15445 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16148: Implement GroupMetadataManager#onUnloaded [kafka]
dajac commented on code in PR #15446: URL: https://github.com/apache/kafka/pull/15446#discussion_r1508764392 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1830,6 +1832,63 @@ public void onLoaded() { }); } +/** + * Called when the partition is unloaded. Cancel all existing timers for the group. + * ClassicGroup: Complete all awaiting join futures and sync futures. Transition group to Dead. + */ +public void onUnloaded() { +groups.values().forEach(group -> { +switch (group.type()) { +case CONSUMER: +ConsumerGroup consumerGroup = (ConsumerGroup) group; +log.info("[GroupId={}] Unloading group metadata for group epoch {}.", +consumerGroup.groupId(), consumerGroup.groupEpoch()); + +consumerGroup.members().values().forEach(member -> { + timer.cancel(consumerGroupSessionTimeoutKey(consumerGroup.groupId(), member.memberId())); + timer.cancel(consumerGroupRevocationTimeoutKey(consumerGroup.groupId(), member.memberId())); Review Comment: Cancelling the timers is not necessary here because all the timers of the shard are already cancelled when we reach this point. Check the unload method in the coordinator context. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1830,6 +1832,63 @@ public void onLoaded() { }); } +/** + * Called when the partition is unloaded. Cancel all existing timers for the group. + * ClassicGroup: Complete all awaiting join futures and sync futures. Transition group to Dead. + */ +public void onUnloaded() { +groups.values().forEach(group -> { +switch (group.type()) { +case CONSUMER: +ConsumerGroup consumerGroup = (ConsumerGroup) group; +log.info("[GroupId={}] Unloading group metadata for group epoch {}.", +consumerGroup.groupId(), consumerGroup.groupEpoch()); + +consumerGroup.members().values().forEach(member -> { + timer.cancel(consumerGroupSessionTimeoutKey(consumerGroup.groupId(), member.memberId())); + timer.cancel(consumerGroupRevocationTimeoutKey(consumerGroup.groupId(), member.memberId())); +}); + +break; +case CLASSIC: +ClassicGroup classicGroup = (ClassicGroup) group; +log.info("[GroupId={}] Unloading group metadata for generation {}.", +classicGroup.groupId(), classicGroup.generationId()); + +classicGroup.transitionTo(DEAD); +switch (classicGroup.previousState()) { +case EMPTY: +case DEAD: +break; +case PREPARING_REBALANCE: +classicGroup.allMembers().forEach(member -> { +classicGroup.completeJoinFuture(member, new JoinGroupResponseData() +.setMemberId(member.memberId()) +.setMembers(Collections.emptyList()) +.setGenerationId(NO_GENERATION) +.setErrorCode(NOT_COORDINATOR.code())); + + timer.cancel(classicGroupHeartbeatKey(classicGroup.groupId(), member.memberId())); +}); + + timer.cancel(classicGroupJoinKey(classicGroup.groupId())); +break; +case COMPLETING_REBALANCE: +case STABLE: +classicGroup.allMembers().forEach(member -> { +classicGroup.completeSyncFuture(member, new SyncGroupResponseData() +.setAssignment(EMPTY_ASSIGNMENT) Review Comment: The assignment is empty by default too. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1830,6 +1832,63 @@ public void onLoaded() { }); } +/** + * Called when the partition is unloaded. Cancel all existing timers for the group. + * ClassicGroup: Complete all awaiting join futures and sync futures. Transition group to Dead. + */ +public void onUnloaded() { +groups.values().forEach(group -> { +switch (group.type()) { +case CONSUMER: +ConsumerGroup consumerGroup = (ConsumerGroup) group; +log.info("[GroupId={}] Unloading group metadata for group epoch {}.", +
Re: [PR] KAFKA-16313: offline group protocol upgrade [kafka]
dajac commented on code in PR #15442: URL: https://github.com/apache/kafka/pull/15442#discussion_r1508759431 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3500,6 +3501,42 @@ public void maybeDeleteGroup(String groupId, List records) { } } +/** + * A group can be upgraded offline if it's a classic group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline upgrade is valid. + */ +private boolean validateOfflineUpgrade(String groupId) { +Group group = groups.get(groupId); +return group != null && group.type() == CLASSIC && group.isEmpty(); +} + +/** + * A group can be downgraded offline if it's a consumer group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline downgrade is valid. + */ +private boolean validateOfflineDowngrade(String groupId) { +Group group = groups.get(groupId); +return group != null && group.type() == CONSUMER && group.isEmpty(); +} + +/** + * Upgrade/Downgrade the empty group if it's valid. + * + * @param groupId The group id to be migrated. + * @param records The list of records to delete the previous group. + */ +public void maybeMigrateEmptyGroup(String groupId, List records, boolean isUpgrade) { +if ((isUpgrade && validateOfflineUpgrade(groupId)) || +(!isUpgrade && validateOfflineDowngrade(groupId))) { +deleteGroup(groupId, records); +removeGroup(groupId); Review Comment: Do we actually replay records in this particular case? The replay handling for the classic protocol is a big different from all the other ones. See here: https://github.com/apache/kafka/pull/15442/files#diff-00f0f81cf13e66781777d94f7d2e68a581663385c37e98792507f2294c91bb09R1978. When the coordinator result contains a `appendFuture`, the write operation skips the replay if I remember correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16313: offline group protocol upgrade [kafka]
dajac commented on code in PR #15442: URL: https://github.com/apache/kafka/pull/15442#discussion_r1508759431 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3500,6 +3501,42 @@ public void maybeDeleteGroup(String groupId, List records) { } } +/** + * A group can be upgraded offline if it's a classic group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline upgrade is valid. + */ +private boolean validateOfflineUpgrade(String groupId) { +Group group = groups.get(groupId); +return group != null && group.type() == CLASSIC && group.isEmpty(); +} + +/** + * A group can be downgraded offline if it's a consumer group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline downgrade is valid. + */ +private boolean validateOfflineDowngrade(String groupId) { +Group group = groups.get(groupId); +return group != null && group.type() == CONSUMER && group.isEmpty(); +} + +/** + * Upgrade/Downgrade the empty group if it's valid. + * + * @param groupId The group id to be migrated. + * @param records The list of records to delete the previous group. + */ +public void maybeMigrateEmptyGroup(String groupId, List records, boolean isUpgrade) { +if ((isUpgrade && validateOfflineUpgrade(groupId)) || +(!isUpgrade && validateOfflineDowngrade(groupId))) { +deleteGroup(groupId, records); +removeGroup(groupId); Review Comment: Do we actually replay records in this particular case? The replay handling for the classic protocol is a big different from all the other ones. See here: https://github.com/apache/kafka/pull/15442/files#diff-00f0f81cf13e66781777d94f7d2e68a581663385c37e98792507f2294c91bb09R1978. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16237: Add Scala 3 support for Kafka Streams [kafka]
altrack commented on PR #15338: URL: https://github.com/apache/kafka/pull/15338#issuecomment-1972858044 @mberndt123 one of the possible solutions may be to attach the Scala 3 related step as a piggyback to Scala 2.13, it will be skipped when `scalaVersion=2.12`. This until we have a full Scala 3 release. It can be done by adding an additional project called `:streams:streams-scala3` which will be dependent on `:streams:streams-scala` and can be excluded by `-x :streams:streams-scala3`. No change in gradlewAll or Jenkinsfile required. So, by default and with scalaVersion=2.13 it will provide us with 2 artifacts for 2.13 and 3. Also, in this case I don't see a benefit of having Scala 3 versions, as longs as those as LTS it should just work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16222) Incorrect default user-principal quota after migration
[ https://issues.apache.org/jira/browse/KAFKA-16222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dominik updated KAFKA-16222: Affects Version/s: 3.7.0 > Incorrect default user-principal quota after migration > -- > > Key: KAFKA-16222 > URL: https://issues.apache.org/jira/browse/KAFKA-16222 > Project: Kafka > Issue Type: Bug > Components: kraft, migration >Affects Versions: 3.7.0, 3.6.1 >Reporter: Dominik >Priority: Blocker > > We observed that our default user quota seems not to be migrated correctly. > Before Migration: > bin/kafka-configs.sh --describe --all --entity-type users > Quota configs for the *default user-principal* are > consumer_byte_rate=100.0, producer_byte_rate=100.0 > Quota configs for user-principal {color:#172b4d}'myuser{*}@{*}prod'{color} > are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8 > After Migration: > bin/kafka-configs.sh --describe --all --entity-type users > Quota configs for *user-principal ''* are consumer_byte_rate=100.0, > producer_byte_rate=100.0 > Quota configs for user-principal {color:#172b4d}'myuser{*}%40{*}prod'{color} > are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8 > > Additional finding: Our names contains a "@" which also lead to incorrect > after migration state. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16222) Incorrect default user-principal quota after migration
[ https://issues.apache.org/jira/browse/KAFKA-16222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dominik updated KAFKA-16222: Priority: Blocker (was: Major) > Incorrect default user-principal quota after migration > -- > > Key: KAFKA-16222 > URL: https://issues.apache.org/jira/browse/KAFKA-16222 > Project: Kafka > Issue Type: Bug > Components: kraft, migration >Affects Versions: 3.6.1 >Reporter: Dominik >Priority: Blocker > > We observed that our default user quota seems not to be migrated correctly. > Before Migration: > bin/kafka-configs.sh --describe --all --entity-type users > Quota configs for the *default user-principal* are > consumer_byte_rate=100.0, producer_byte_rate=100.0 > Quota configs for user-principal {color:#172b4d}'myuser{*}@{*}prod'{color} > are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8 > After Migration: > bin/kafka-configs.sh --describe --all --entity-type users > Quota configs for *user-principal ''* are consumer_byte_rate=100.0, > producer_byte_rate=100.0 > Quota configs for user-principal {color:#172b4d}'myuser{*}%40{*}prod'{color} > are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8 > > Additional finding: Our names contains a "@" which also lead to incorrect > after migration state. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Add Deletion Reasons in KRaft Snapshot Deletion Logging [kafka]
hni61223 opened a new pull request, #15450: URL: https://github.com/apache/kafka/pull/15450 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: move TimeRatio to common.metrics.stats package [kafka]
dajac commented on PR #15447: URL: https://github.com/apache/kafka/pull/15447#issuecomment-1972748553 I am not sure about this one. `org.apache.kafka.common.metrics.stats` is part of public API so moving `TimeRatio` there would effectively make it part of our public API. We would need a KIP for this. Have you considered moving it to `server-common`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org