[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r634023546 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Type.INT; +import static org.apache.kafka.common.config.ConfigDef.Type.LONG; + +public final class TopicBasedRemoteLogMetadataManagerConfig { +private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfig.class.getName()); + +public static final String REMOTE_LOG_METADATA_TOPIC_NAME = "__remote_log_metadata"; + +public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP = "remote.log.metadata.topic.replication.factor"; +public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP = "remote.log.metadata.topic.num.partitions"; +public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS_PROP = "remote.log.metadata.topic.retention.ms"; +public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP = "remote.log.metadata.publish.wait.ms"; + +public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS = 50; +public static final long DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS = -1L; +public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3; +public static final long DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS = 60 * 1000L; + +public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_DOC = "Replication factor of remote log metadata Topic."; +public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC = "The number of partitions for remote log metadata Topic."; +public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS_DOC = "Remote log metadata topic log retention in milli seconds." + +"Default: -1, that means unlimited. Users can configure this value based on their use cases. " + +"To avoid any data loss, this value should be more than the maximum retention period of any topic enabled with " + +"tiered storage in the cluster."; +public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC = "The amount of time in milli seconds to wait for the local consumer to " + +"receive the published event."; + +public static final String REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX = "remote.log.metadata.common.client."; +public static final String REMOTE_LOG_METADATA_PRODUCER_PREFIX = "remote.log.metadata.producer."; +public static final String REMOTE_LOG_METADATA_CONSUMER_PREFIX = "remote.log.metadata.consumer."; + +private static final String REMOTE_LOG_METADATA_CLIENT_PREFIX = "__remote_log_metadata_client"; +private static final String BROKER_ID = "broker.id"; + +private static final ConfigDef CONFIG = new ConfigDef(); +static { +CONFIG.define(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, INT, DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR, atLeast(1), LOW, Review comment: `Config` definition will be moved to `KafkaConfig` later when default RLMM is integrated with the broker. -- This is an automated message from the Apache
[GitHub] [kafka] vamossagar12 commented on pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.
vamossagar12 commented on pull request #10542: URL: https://github.com/apache/kafka/pull/10542#issuecomment-842807268 thanks @ableegoldman , have updated the main KIP page, the streams kiP sub page and the actual kiP. Thanks for your support! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12804) In-Memory state store backed by InMemoryKeyValueStore is slow due to synchronizing reads/writes using a single lock.
[ https://issues.apache.org/jira/browse/KAFKA-12804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346566#comment-17346566 ] Ki Lok Wong commented on KAFKA-12804: - Yep, I will do it later this arvo. Cheers. > In-Memory state store backed by InMemoryKeyValueStore is slow due to > synchronizing reads/writes using a single lock. > > > Key: KAFKA-12804 > URL: https://issues.apache.org/jira/browse/KAFKA-12804 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.6.2 >Reporter: Ki Lok Wong >Assignee: Ki Lok Wong >Priority: Minor > Labels: performance, pull-request-available > Original Estimate: 2h > Remaining Estimate: 2h > > Default In-Memory state store (InMemoryKeyValueStore) uses TreeMap as the > backing map implementation. Read/write methods such as get() and put() are > synchronized on the same lock hence significantly reducing multi-thread > performance. > https://github.com/a0x8o/kafka/blob/f834be5e73e46805b29df9845bf09be98f91fc09/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java#L78 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #10715: KAFKA-9295: increase heartbeat and session timeout
showuon commented on pull request #10715: URL: https://github.com/apache/kafka/pull/10715#issuecomment-842799201 @ableegoldman , as discussed, could you help review this PR to increase session timeout? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12804) In-Memory state store backed by InMemoryKeyValueStore is slow due to synchronizing reads/writes using a single lock.
[ https://issues.apache.org/jira/browse/KAFKA-12804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ki Lok Wong reassigned KAFKA-12804: --- Assignee: Ki Lok Wong > In-Memory state store backed by InMemoryKeyValueStore is slow due to > synchronizing reads/writes using a single lock. > > > Key: KAFKA-12804 > URL: https://issues.apache.org/jira/browse/KAFKA-12804 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.6.2 >Reporter: Ki Lok Wong >Assignee: Ki Lok Wong >Priority: Minor > Labels: performance, pull-request-available > Original Estimate: 2h > Remaining Estimate: 2h > > Default In-Memory state store (InMemoryKeyValueStore) uses TreeMap as the > backing map implementation. Read/write methods such as get() and put() are > synchronized on the same lock hence significantly reducing multi-thread > performance. > https://github.com/a0x8o/kafka/blob/f834be5e73e46805b29df9845bf09be98f91fc09/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java#L78 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12804) In-Memory state store backed by InMemoryKeyValueStore is slow due to synchronizing reads/writes using a single lock.
[ https://issues.apache.org/jira/browse/KAFKA-12804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346554#comment-17346554 ] A. Sophie Blee-Goldman commented on KAFKA-12804: Hey [~gorsonw] thanks for the ticket, this class certainly has not been optimized much (or at all). I see you applied the "pull request available" label to this, can you include the link to your PR in the ticket and assign it to yourself if you plan to work on this? > In-Memory state store backed by InMemoryKeyValueStore is slow due to > synchronizing reads/writes using a single lock. > > > Key: KAFKA-12804 > URL: https://issues.apache.org/jira/browse/KAFKA-12804 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.6.2 >Reporter: Ki Lok Wong >Priority: Minor > Labels: performance, pull-request-available > Original Estimate: 2h > Remaining Estimate: 2h > > Default In-Memory state store (InMemoryKeyValueStore) uses TreeMap as the > backing map implementation. Read/write methods such as get() and put() are > synchronized on the same lock hence significantly reducing multi-thread > performance. > https://github.com/a0x8o/kafka/blob/f834be5e73e46805b29df9845bf09be98f91fc09/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java#L78 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon opened a new pull request #10715: KAFKA-9295: increase heartbeat and session timeout
showuon opened a new pull request #10715: URL: https://github.com/apache/kafka/pull/10715 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12804) In-Memory state store backed by InMemoryKeyValueStore is slow due to synchronizing reads/writes using a single lock.
[ https://issues.apache.org/jira/browse/KAFKA-12804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ki Lok Wong updated KAFKA-12804: Remaining Estimate: 2h (was: 2m) Original Estimate: 2h (was: 2m) > In-Memory state store backed by InMemoryKeyValueStore is slow due to > synchronizing reads/writes using a single lock. > > > Key: KAFKA-12804 > URL: https://issues.apache.org/jira/browse/KAFKA-12804 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.6.2 >Reporter: Ki Lok Wong >Priority: Minor > Labels: performance, pull-request-available > Original Estimate: 2h > Remaining Estimate: 2h > > Default In-Memory state store (InMemoryKeyValueStore) uses TreeMap as the > backing map implementation. Read/write methods such as get() and put() are > synchronized on the same lock hence significantly reducing multi-thread > performance. > https://github.com/a0x8o/kafka/blob/f834be5e73e46805b29df9845bf09be98f91fc09/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java#L78 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12804) In-Memory state store backed by InMemoryKeyValueStore is slow due to synchronizing reads/writes using a single lock.
Ki Lok Wong created KAFKA-12804: --- Summary: In-Memory state store backed by InMemoryKeyValueStore is slow due to synchronizing reads/writes using a single lock. Key: KAFKA-12804 URL: https://issues.apache.org/jira/browse/KAFKA-12804 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.6.2 Reporter: Ki Lok Wong Default In-Memory state store (InMemoryKeyValueStore) uses TreeMap as the backing map implementation. Read/write methods such as get() and put() are synchronized on the same lock hence significantly reducing multi-thread performance. https://github.com/a0x8o/kafka/blob/f834be5e73e46805b29df9845bf09be98f91fc09/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java#L78 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #10471: KAFKA-12597: remove deprecated zookeeper option in ReassignPartitionsCommand
showuon commented on pull request #10471: URL: https://github.com/apache/kafka/pull/10471#issuecomment-842772724 @ijuma , could you take a look again for this PR? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10665: KAFKA-9009: increase replica.lag.time.max.ms to make the test reliable
showuon commented on pull request #10665: URL: https://github.com/apache/kafka/pull/10665#issuecomment-842772461 @edoardocomar @mimaison , could you help review this PR to make the test reliable. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9311: KAFKA-9910: Implement new transaction timed out error
abbccdda commented on pull request #9311: URL: https://github.com/apache/kafka/pull/9311#issuecomment-842758541 retest this please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
jsancio commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633958458 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -936,21 +944,13 @@ private QuorumController(LogContext logContext, this.replicationControl = new ReplicationControlManager(snapshotRegistry, logContext, defaultReplicationFactor, defaultNumPartitions, configurationControl, clusterControl, controllerMetrics); -this.logManager = logManager; +this.raftClient = raftClient; this.metaLogListener = new QuorumMetaLogListener(); -this.curClaimEpoch = -1L; -this.lastCommittedOffset = snapshotReader.epoch(); +this.curClaimEpoch = -1; +this.lastCommittedOffset = -1L; this.writeOffset = -1L; -while (snapshotReader.hasNext()) { -List batch = snapshotReader.next(); -long index = 0; -for (ApiMessage message : batch) { -replay(message, snapshotReader.epoch(), index++); -} -} -snapshotRegistry.createSnapshot(lastCommittedOffset); Review comment: Okay. I didn't want do this as part of the merge commit but I'll add it back as a separate commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
jsancio commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633957907 ## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala ## @@ -234,22 +259,20 @@ class BrokerMetadataListener(brokerId: Int, clientQuotaManager.handleQuotaRecord(record) } - class HandleNewLeaderEvent(leader: MetaLogLeader) + class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch) extends EventQueue.FailureLoggingEvent(log) { override def run(): Unit = { val imageBuilder = MetadataImageBuilder(brokerId, log, metadataCache.currentImage()) - if (leader.nodeId() < 0) { -imageBuilder.controllerId(None) - } else { -imageBuilder.controllerId(Some(leader.nodeId())) - } + imageBuilder.controllerId(leaderAndEpoch.leaderId.asScala) metadataCache.image(imageBuilder.build()) } } - override def handleNewLeader(leader: MetaLogLeader): Unit = { -eventQueue.append(new HandleNewLeaderEvent(leader)) + override def handleLeaderChange(leader: LeaderAndEpoch): Unit = { +if (leader.isLeader(brokerId)) { + eventQueue.append(new HandleNewLeaderEvent(leader)) Review comment: Yeah, I think you are correct. We should always send leadership changes to the broker handlers and update the cache accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
jsancio commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633956946 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java ## @@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) { this.epoch = epoch; } Review comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
jsancio commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633956837 ## File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java ## @@ -57,4 +60,9 @@ */ @Override void close(); + +static BatchReader singleton(Batch batch) { Review comment: I removed this method. I think it is a dangerous method to have in `BatchReader` since batch readers created this way do not integrate with the rest of the `KafkaRaftClient`. This method was only used for tests so I just modified those cases to use `MemoryBatchReader` directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
jsancio commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633956073 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -202,7 +200,7 @@ public Builder setMetrics(ControllerMetrics controllerMetrics) { @SuppressWarnings("unchecked") public QuorumController build() throws Exception { -if (logManager == null) { +if (raftClient == null) { throw new RuntimeException("You must set a metadata log manager."); Review comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
jsancio commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633956010 ## File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java ## @@ -218,7 +226,7 @@ synchronized void electLeaderIfNeeded() { /** * The node ID of this local log manager. Each log manager must have a unique ID. */ -private final int nodeId; +public final int nodeId; Review comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
jsancio commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633940297 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2261,6 +2260,11 @@ private Long append(int epoch, List records, boolean isAtomic) { return shutdownComplete; } +@Override +public void resign(int epoch) { +throw new UnsupportedOperationException(); Review comment: Yes. We have an issue for this: https://issues.apache.org/jira/browse/KAFKA-12631 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12803) Support reassigning partitions when in KRaft mode
Colin McCabe created KAFKA-12803: Summary: Support reassigning partitions when in KRaft mode Key: KAFKA-12803 URL: https://issues.apache.org/jira/browse/KAFKA-12803 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 2.8.0 Reporter: Colin McCabe Assignee: Colin McCabe Fix For: 3.0.0 Support reassigning partitions when in KRaft mode -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12788) Improve KRaft replica placement
[ https://issues.apache.org/jira/browse/KAFKA-12788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-12788: - Affects Version/s: 2.8.0 > Improve KRaft replica placement > --- > > Key: KAFKA-12788 > URL: https://issues.apache.org/jira/browse/KAFKA-12788 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.8.0 >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Labels: kip-500 > Fix For: 3.0.0 > > > Implement the existing Kafka replica placement algorithm for KRaft. > This also means implementing rack awareness. Previously, we just chose > replicas randomly in a non-rack-aware fashion. Also, allow replicas to > be placed on fenced brokers if there are no other choices. This was > specified in KIP-631 but previously not implemented. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12788) Improve KRaft replica placement
[ https://issues.apache.org/jira/browse/KAFKA-12788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-12788. -- Fix Version/s: 3.0.0 Reviewer: Jun Rao Resolution: Fixed > Improve KRaft replica placement > --- > > Key: KAFKA-12788 > URL: https://issues.apache.org/jira/browse/KAFKA-12788 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Labels: kip-500 > Fix For: 3.0.0 > > > Implement the existing Kafka replica placement algorithm for KRaft. > This also means implementing rack awareness. Previously, we just chose > replicas randomly in a non-rack-aware fashion. Also, allow replicas to > be placed on fenced brokers if there are no other choices. This was > specified in KIP-631 but previously not implemented. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe merged pull request #10494: KAFKA-12788: improve KRaft replica placement
cmccabe merged pull request #10494: URL: https://github.com/apache/kafka/pull/10494 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
jsancio commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633935466 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ## @@ -56,24 +57,17 @@ void handleSnapshot(SnapshotReader reader); /** Review comment: > Or you could choose to burn an epoch like this... This is not how leader election works in Raft. When a leader fails or steps down an epoch starts without a leader, only candidate(s). If leader election succeeds for a given epoch only one leader is guaranteed to be elected for that epoch and will remain leader for the duration of that epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
jsancio commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633935466 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ## @@ -56,24 +57,17 @@ void handleSnapshot(SnapshotReader reader); /** Review comment: > Or you could choose to burn an epoch like this... This is not how leader election works in Raft. When a leader fails or steps down an epoch starts without a leader, only candidate(s). If leader election succeeds for a given epoch only one leader is guaranteed to elect for that epoch and will remain leader for the duration of that epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
jsancio commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633930529 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ## @@ -56,24 +57,17 @@ void handleSnapshot(SnapshotReader reader); /** Review comment: I'll update the documentation but in Raft epochs are not guarantee to have a leader. If there is a leader for an epoch then there is one and only one leader. So that means that the client could see. ``` handleLeaderChange(LeaderAndEpoch(nodeId=0, epoch=0)) handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=1)) handleLeaderChange(LeaderAndEpoch(nodeId=1, epoch=1)) ``` Or this for that matter ``` handleLeaderChange(LeaderAndEpoch(nodeId=0, epoch=0)) handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=1)) handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=2)) handleLeaderChange(LeaderAndEpoch(nodeId=1, epoch=2)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement
cmccabe commented on a change in pull request #10494: URL: https://github.com/apache/kafka/pull/10494#discussion_r633929510 ## File path: metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList; +import org.apache.kafka.controller.StripedReplicaPlacer.RackList; +import org.apache.kafka.metadata.UsableBroker; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + + +@Timeout(value = 40) +public class StripedReplicaPlacerTest { +/** + * Test that the BrokerList class works as expected. + */ +@Test +public void testBrokerList() { +assertEquals(0, BrokerList.EMPTY.size()); +assertEquals(-1, BrokerList.EMPTY.next(1)); +BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3); +assertEquals(4, brokers.size()); +assertEquals(0, brokers.next(0)); +assertEquals(1, brokers.next(0)); +assertEquals(2, brokers.next(0)); +assertEquals(3, brokers.next(0)); +assertEquals(-1, brokers.next(0)); +assertEquals(-1, brokers.next(0)); +assertEquals(1, brokers.next(1)); +assertEquals(2, brokers.next(1)); +assertEquals(3, brokers.next(1)); +assertEquals(0, brokers.next(1)); +assertEquals(-1, brokers.next(1)); +} + +/** + * Test that we perform striped replica placement as expected, and don't use the + * fenced replica if we don't have to. + */ +@Test +public void testAvoidFencedReplicaIfPossibleOnSingleRack() { +MockRandom random = new MockRandom(); +RackList rackList = new RackList(random, Arrays.asList( +new UsableBroker(3, Optional.empty(), false), +new UsableBroker(1, Optional.empty(), true), +new UsableBroker(0, Optional.empty(), false), +new UsableBroker(4, Optional.empty(), false), +new UsableBroker(2, Optional.empty(), false)).iterator()); +assertEquals(5, rackList.numTotalBrokers()); +assertEquals(4, rackList.numUnfencedBrokers()); +assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames()); +assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(0)); +assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(-1)); +assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4)); +assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4)); +assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4)); +assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4)); +assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4)); +} + +/** + * Test that we will place on the fenced replica if we need to. + */ +@Test +public void testPlacementOnFencedReplicaOnSingleRack() { +MockRandom random = new MockRandom(); +RackList rackList = new RackList(random, Arrays.asList( +new UsableBroker(3, Optional.empty(), false), +new UsableBroker(1, Optional.empty(), true), +new UsableBroker(2, Optional.empty(), false)).iterator()); +assertEquals(3, rackList.numTotalBrokers()); +assertEquals(2, rackList.numUnfencedBrokers()); +assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames()); +assertEquals(Arrays.asList(3, 2, 1), rackList.place(3)); +assertEquals(Arrays.asList(2, 3, 1), rackList.place(3)); +assertEquals(Arrays.asList(3, 2, 1), rackList.place(3)); +assertEquals(Arrays.asList(2, 3, 1), rackList.place(3)); +} + +@Test +public void
[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement
cmccabe commented on a change in pull request #10494: URL: https://github.com/apache/kafka/pull/10494#discussion_r633926611 ## File path: metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java ## @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; + +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.metadata.OptionalStringComparator; +import org.apache.kafka.metadata.UsableBroker; + + +/** + * The striped replica placer. + * + * + * GOALS + * The design of this placer attempts to satisfy a few competing goals. Firstly, we want + * to spread the replicas as evenly as we can across racks. In the simple case where + * broker racks have not been configured, this goal is a no-op, of course. But it is the + * highest priority goal in multi-rack clusters. + * + * Our second goal is to spread the replicas evenly across brokers. Since we are placing + * multiple partitions, we try to avoid putting each partition on the same set of + * replicas, even if it does satisfy the rack placement goal. However, we treat the rack + * placement goal as higher priority than this goal-- if you configure 10 brokers in rack + * A and B, and 1 broker in rack C, you will end up with a lot of partitions on that one + * broker in rack C. If you were to place a lot of partitions with replication factor 3, + * each partition would try to get a replica there. In general racks are supposed to be + * about the same size -- if they aren't, this is a user error. + * + * Thirdly, we would prefer to place replicas on unfenced brokers, rather than on fenced + * brokers. + * + * + * CONSTRAINTS + * In addition to these goals, we have two constraints. Unlike the goals, these are not + * optional -- they are mandatory. Placement will fail if a constraint cannot be + * satisfied. The first constraint is that we can't place more than one replica on the + * same broker. This imposes an upper limit on replication factor-- for example, a 3-node + * cluster can't have any topics with replication factor 4. This constraint comes from + * Kafka's internal design. + * + * The second constraint is that the leader of each partition must be an unfenced broker. + * This constraint is a bit arbitrary. In theory, we could allow people to create + * new topics even if every broker were fenced. However, this would be confusing for + * users. + * + * + * ALGORITHM + * The StripedReplicaPlacer constructor loads the broker data into rack objects. Each + * rack object contains a sorted list of fenced brokers, and a separate sorted list of + * unfenced brokers. The racks themselves are organized into a sorted list, stored inside + * the top-level RackList object. + * + * The general idea is that we place replicas on to racks in a round-robin fashion. So if + * we had racks A, B, C, and D, and we were creating a new partition with replication + * factor 3, our first replica might come from A, our second from B, and our third from C. + * Of course our placement would not be very fair if we always started with rack A. + * Therefore, we generate a random starting offset when the RackList is created. So one + * time we might go B, C, D. Another time we might go C, D, A. And so forth. + * + * Note that each partition we generate advances the starting offset by one. + * So in our 4-rack cluster, with 3 partitions, we might choose these racks: + * + * partition 1: A, B, C + * partition 2: B, C, A + * partition 3: C, A, B + * + * This is what generates the characteristic "striped" pattern of this placer. + * + * So far I haven't said anything about how we choose a replica from within a rack. In + * fact, this is also done in a round-robin fashion. So if rack A had replica A0, A1, A2, + * and A3, we might return A0 the first time, A1, the second, A2 the third, and so on. + * Just like with the racks, we add a random
[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement
cmccabe commented on a change in pull request #10494: URL: https://github.com/apache/kafka/pull/10494#discussion_r633926611 ## File path: metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java ## @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; + +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.metadata.OptionalStringComparator; +import org.apache.kafka.metadata.UsableBroker; + + +/** + * The striped replica placer. + * + * + * GOALS + * The design of this placer attempts to satisfy a few competing goals. Firstly, we want + * to spread the replicas as evenly as we can across racks. In the simple case where + * broker racks have not been configured, this goal is a no-op, of course. But it is the + * highest priority goal in multi-rack clusters. + * + * Our second goal is to spread the replicas evenly across brokers. Since we are placing + * multiple partitions, we try to avoid putting each partition on the same set of + * replicas, even if it does satisfy the rack placement goal. However, we treat the rack + * placement goal as higher priority than this goal-- if you configure 10 brokers in rack + * A and B, and 1 broker in rack C, you will end up with a lot of partitions on that one + * broker in rack C. If you were to place a lot of partitions with replication factor 3, + * each partition would try to get a replica there. In general racks are supposed to be + * about the same size -- if they aren't, this is a user error. + * + * Thirdly, we would prefer to place replicas on unfenced brokers, rather than on fenced + * brokers. + * + * + * CONSTRAINTS + * In addition to these goals, we have two constraints. Unlike the goals, these are not + * optional -- they are mandatory. Placement will fail if a constraint cannot be + * satisfied. The first constraint is that we can't place more than one replica on the + * same broker. This imposes an upper limit on replication factor-- for example, a 3-node + * cluster can't have any topics with replication factor 4. This constraint comes from + * Kafka's internal design. + * + * The second constraint is that the leader of each partition must be an unfenced broker. + * This constraint is a bit arbitrary. In theory, we could allow people to create + * new topics even if every broker were fenced. However, this would be confusing for + * users. + * + * + * ALGORITHM + * The StripedReplicaPlacer constructor loads the broker data into rack objects. Each + * rack object contains a sorted list of fenced brokers, and a separate sorted list of + * unfenced brokers. The racks themselves are organized into a sorted list, stored inside + * the top-level RackList object. + * + * The general idea is that we place replicas on to racks in a round-robin fashion. So if + * we had racks A, B, C, and D, and we were creating a new partition with replication + * factor 3, our first replica might come from A, our second from B, and our third from C. + * Of course our placement would not be very fair if we always started with rack A. + * Therefore, we generate a random starting offset when the RackList is created. So one + * time we might go B, C, D. Another time we might go C, D, A. And so forth. + * + * Note that each partition we generate advances the starting offset by one. + * So in our 4-rack cluster, with 3 partitions, we might choose these racks: + * + * partition 1: A, B, C + * partition 2: B, C, A + * partition 3: C, A, B + * + * This is what generates the characteristic "striped" pattern of this placer. + * + * So far I haven't said anything about how we choose a replica from within a rack. In + * fact, this is also done in a round-robin fashion. So if rack A had replica A0, A1, A2, + * and A3, we might return A0 the first time, A1, the second, A2 the third, and so on. + * Just like with the racks, we add a random
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633909171 ## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ## @@ -209,136 +196,6 @@ public void testUnregisterBroker() throws Throwable { } } -static class MockSnapshotWriterBuilder implements Function { Review comment: Well, as I said in the other comments... I'd really like to find a way to keep this test working before we merge this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633900447 ## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala ## @@ -234,22 +259,20 @@ class BrokerMetadataListener(brokerId: Int, clientQuotaManager.handleQuotaRecord(record) } - class HandleNewLeaderEvent(leader: MetaLogLeader) + class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch) extends EventQueue.FailureLoggingEvent(log) { override def run(): Unit = { val imageBuilder = MetadataImageBuilder(brokerId, log, metadataCache.currentImage()) - if (leader.nodeId() < 0) { -imageBuilder.controllerId(None) - } else { -imageBuilder.controllerId(Some(leader.nodeId())) - } + imageBuilder.controllerId(leaderAndEpoch.leaderId.asScala) metadataCache.image(imageBuilder.build()) } } - override def handleNewLeader(leader: MetaLogLeader): Unit = { -eventQueue.append(new HandleNewLeaderEvent(leader)) + override def handleLeaderChange(leader: LeaderAndEpoch): Unit = { +if (leader.isLeader(brokerId)) { + eventQueue.append(new HandleNewLeaderEvent(leader)) Review comment: This doesn't seem correct We need to know the controller ID even if it's not the same as our broker ID. (It's not even clear that we will always have a broker with the same ID as any controller, since their ID space is separate.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633918245 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -936,21 +944,13 @@ private QuorumController(LogContext logContext, this.replicationControl = new ReplicationControlManager(snapshotRegistry, logContext, defaultReplicationFactor, defaultNumPartitions, configurationControl, clusterControl, controllerMetrics); -this.logManager = logManager; +this.raftClient = raftClient; this.metaLogListener = new QuorumMetaLogListener(); -this.curClaimEpoch = -1L; -this.lastCommittedOffset = snapshotReader.epoch(); +this.curClaimEpoch = -1; +this.lastCommittedOffset = -1L; this.writeOffset = -1L; -while (snapshotReader.hasNext()) { -List batch = snapshotReader.next(); -long index = 0; -for (ApiMessage message : batch) { -replay(message, snapshotReader.epoch(), index++); -} -} -snapshotRegistry.createSnapshot(lastCommittedOffset); Review comment: Hmm... I would rather not remove this code if possible. In another comment I suggested moving it to the snapshot load function that you also added. That would also allow the associated test to keep working. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12802) Add a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker restarts.
Satish Duggana created KAFKA-12802: -- Summary: Add a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker restarts. Key: KAFKA-12802 URL: https://issues.apache.org/jira/browse/KAFKA-12802 Project: Kafka Issue Type: Sub-task Reporter: Satish Duggana Assignee: Satish Duggana Fix For: 3.0.0 Add a file based cache for consumed remote log metadata for each partition to avoid consuming again in case of broker restarts. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9555) Topic-based implementation for the RemoteLogMetadataManager
[ https://issues.apache.org/jira/browse/KAFKA-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-9555: -- Description: The purpose of this task is to implement a {{RemoteLogMetadataManager}} based on an internal topic in Kafka. More details ar mentioned in the [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-RemoteLogMetadataManagerimplementedwithaninternaltopic]. Done means: - Pull Request available for review and unit-tests. System and integration tests are out of scope of this task and will be part of another task. was: The purpose of this task is to implement a {{RemoteLogMetadataManager}} based on an internal topic in Kafka. Done means: - Pull Request available for review and unit-tests. System and integration tests are out of scope of this task and will be part of another task. > Topic-based implementation for the RemoteLogMetadataManager > --- > > Key: KAFKA-9555 > URL: https://issues.apache.org/jira/browse/KAFKA-9555 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Alexandre Dupriez >Assignee: Satish Duggana >Priority: Major > Fix For: 3.0.0 > > > The purpose of this task is to implement a {{RemoteLogMetadataManager}} based > on an internal topic in Kafka. More details ar mentioned in the > [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-RemoteLogMetadataManagerimplementedwithaninternaltopic]. > Done means: > - Pull Request available for review and unit-tests. > System and integration tests are out of scope of this task and will be part > of another task. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633917057 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ## @@ -56,24 +57,17 @@ void handleSnapshot(SnapshotReader reader); /** Review comment: It's fine to combine these APIs, but we need to document what happens if the current leader resigns, but we don't know who the new leader is yet. Do we get passed a LeaderAndEpoch with the current epoch + 1 and a node ID of -1? If so, do we then expect to see another LeaderAndEpoch with the current epoch + 1 and a valid node -1? In other words, let's say node 0 is the leader and then resigns, and then node 1 becomes the leader. Does it look like this: ``` handleLeaderChange(LeaderAndEpoch(nodeId=0, epoch=0)) handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=1)) handleLeaderChange(LeaderAndEpoch(nodeId=1, epoch=1)) ``` Or would you rather have something like this? ``` handleLeaderChange(LeaderAndEpoch(nodeId=0, epoch=0)) handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=0)) handleLeaderChange(LeaderAndEpoch(nodeId=1, epoch=1)) ``` It seems like the second one will break a lot of invariants, so probably should be avoided. The first one might break some invariants too, though. We'd have to look. Or you could choose to burn an epoch like this: ``` handleLeaderChange(LeaderAndEpoch(nodeId=0, epoch=0)) handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=1)) handleLeaderChange(LeaderAndEpoch(nodeId=1, epoch=2)) ``` Given that we only have a 31-bit epoch in the first place, that seems unwise, though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633915001 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2261,6 +2260,11 @@ private Long append(int epoch, List records, boolean isAtomic) { return shutdownComplete; } +@Override +public void resign(int epoch) { +throw new UnsupportedOperationException(); Review comment: This needs to be supported, because the controller will resign if it detects certain bugs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633914096 ## File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java ## @@ -331,22 +343,40 @@ public void beginShutdown() { } @Override -public void close() throws InterruptedException { +public void close() { log.debug("Node {}: closing.", nodeId); beginShutdown(); -eventQueue.close(); + +try { +eventQueue.close(); +} catch (InterruptedException e) { +Thread.currentThread().interrupt(); +throw new RuntimeException(e); +} +} + +@Override +public CompletableFuture shutdown(int timeoutMs) { Review comment: It seems like the intention behind this `shutdown` API is to be non-blocking, but this implementation is not non-blocking. Maybe it would be good to add a comment about this? In general it is not possible to do a non-blocking thread join unless you have a third thread (not the calling thread, not the thread being shut down) which can wait for the blocking thread join operation to complete and then complete a future (or whatever). That's why there are two shutdown APIs in LocalLogManager: a non-blocking beginShutdown and a blocking close which does all that, plus the thread join. This is a pattern that I use in other places as well. I think it's more useful than returning a future from close, due to the problem I mentioned above. It could be worth considering for RaftClient in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633912779 ## File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java ## @@ -218,7 +226,7 @@ synchronized void electLeaderIfNeeded() { /** * The node ID of this local log manager. Each log manager must have a unique ID. */ -private final int nodeId; +public final int nodeId; Review comment: Data fields should not be public -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633910968 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -624,50 +617,71 @@ public String toString() { return event.future(); } -class QuorumMetaLogListener implements MetaLogListener { +class QuorumMetaLogListener implements RaftClient.Listener { + @Override -public void handleCommits(long offset, List messages) { -appendControlEvent("handleCommits[" + offset + "]", () -> { -if (curClaimEpoch == -1) { -// If the controller is a standby, replay the records that were -// created by the active controller. -if (log.isDebugEnabled()) { -if (log.isTraceEnabled()) { -log.trace("Replaying commits from the active node up to " + -"offset {}: {}.", offset, messages.stream(). -map(m -> m.toString()).collect(Collectors.joining(", "))); +public void handleCommit(BatchReader reader) { +appendControlEvent("handleCommits[baseOffset=" + reader.baseOffset() + "]", () -> { +try { +boolean isActiveController = curClaimEpoch != -1; +while (reader.hasNext()) { +Batch batch = reader.next(); +long offset = batch.lastOffset(); +List messages = batch.records(); + +if (isActiveController) { +// If the controller is active, the records were already replayed, +// so we don't need to do it here. +log.debug("Completing purgatory items up to offset {}.", offset); + +// Complete any events in the purgatory that were waiting for this offset. +purgatory.completeUpTo(offset); + +// Delete all the in-memory snapshots that we no longer need. +// If we are writing a new snapshot, then we need to keep that around; +// otherwise, we should delete up to the current committed offset. +snapshotRegistry.deleteSnapshotsUpTo( +Math.min(offset, snapshotGeneratorManager.snapshotEpoch())); + } else { -log.debug("Replaying commits from the active node up to " + -"offset {}.", offset); +// If the controller is a standby, replay the records that were +// created by the active controller. +if (log.isDebugEnabled()) { +if (log.isTraceEnabled()) { +log.trace("Replaying commits from the active node up to " + +"offset {}: {}.", offset, messages.stream() +.map(ApiMessageAndVersion::toString) +.collect(Collectors.joining(", "))); +} else { +log.debug("Replaying commits from the active node up to " + +"offset {}.", offset); +} +} +for (ApiMessageAndVersion messageAndVersion : messages) { +replay(messageAndVersion.message(), -1, offset); +} } +lastCommittedOffset = offset; } -for (ApiMessage message : messages) { -replay(message, -1, offset); -} -} else { -// If the controller is active, the records were already replayed, -// so we don't need to do it here. -log.debug("Completing purgatory items up to offset {}.", offset); - -// Complete any events in the purgatory that were waiting for this offset. -purgatory.completeUpTo(offset); - -// Delete all the in-memory snapshots that we no longer need. -// If we are writing a new snapshot, then we need to keep that around; -// otherwise, we should delete up to the current committed offset. -snapshotRegistry.deleteSnapshotsUpTo( -Math.min(offset, snapshotGeneratorManager.snapshotEpoch())); +} finally { +reader.close(); } -
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633910968 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -624,50 +617,71 @@ public String toString() { return event.future(); } -class QuorumMetaLogListener implements MetaLogListener { +class QuorumMetaLogListener implements RaftClient.Listener { + @Override -public void handleCommits(long offset, List messages) { -appendControlEvent("handleCommits[" + offset + "]", () -> { -if (curClaimEpoch == -1) { -// If the controller is a standby, replay the records that were -// created by the active controller. -if (log.isDebugEnabled()) { -if (log.isTraceEnabled()) { -log.trace("Replaying commits from the active node up to " + -"offset {}: {}.", offset, messages.stream(). -map(m -> m.toString()).collect(Collectors.joining(", "))); +public void handleCommit(BatchReader reader) { +appendControlEvent("handleCommits[baseOffset=" + reader.baseOffset() + "]", () -> { +try { +boolean isActiveController = curClaimEpoch != -1; +while (reader.hasNext()) { +Batch batch = reader.next(); +long offset = batch.lastOffset(); +List messages = batch.records(); + +if (isActiveController) { +// If the controller is active, the records were already replayed, +// so we don't need to do it here. +log.debug("Completing purgatory items up to offset {}.", offset); + +// Complete any events in the purgatory that were waiting for this offset. +purgatory.completeUpTo(offset); + +// Delete all the in-memory snapshots that we no longer need. +// If we are writing a new snapshot, then we need to keep that around; +// otherwise, we should delete up to the current committed offset. +snapshotRegistry.deleteSnapshotsUpTo( +Math.min(offset, snapshotGeneratorManager.snapshotEpoch())); + } else { -log.debug("Replaying commits from the active node up to " + -"offset {}.", offset); +// If the controller is a standby, replay the records that were +// created by the active controller. +if (log.isDebugEnabled()) { +if (log.isTraceEnabled()) { +log.trace("Replaying commits from the active node up to " + +"offset {}: {}.", offset, messages.stream() +.map(ApiMessageAndVersion::toString) +.collect(Collectors.joining(", "))); +} else { +log.debug("Replaying commits from the active node up to " + +"offset {}.", offset); +} +} +for (ApiMessageAndVersion messageAndVersion : messages) { +replay(messageAndVersion.message(), -1, offset); +} } +lastCommittedOffset = offset; } -for (ApiMessage message : messages) { -replay(message, -1, offset); -} -} else { -// If the controller is active, the records were already replayed, -// so we don't need to do it here. -log.debug("Completing purgatory items up to offset {}.", offset); - -// Complete any events in the purgatory that were waiting for this offset. -purgatory.completeUpTo(offset); - -// Delete all the in-memory snapshots that we no longer need. -// If we are writing a new snapshot, then we need to keep that around; -// otherwise, we should delete up to the current committed offset. -snapshotRegistry.deleteSnapshotsUpTo( -Math.min(offset, snapshotGeneratorManager.snapshotEpoch())); +} finally { +reader.close(); } -
[jira] [Assigned] (KAFKA-9578) Kafka Tiered Storage - System Tests
[ https://issues.apache.org/jira/browse/KAFKA-9578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana reassigned KAFKA-9578: - Assignee: Kamal Chandraprakash (was: Alexandre Dupriez) > Kafka Tiered Storage - System Tests > > > Key: KAFKA-9578 > URL: https://issues.apache.org/jira/browse/KAFKA-9578 > Project: Kafka > Issue Type: Sub-task >Reporter: Harsha >Assignee: Kamal Chandraprakash >Priority: Major > > Initial test cases set up by [~Ying Zheng] > > [https://docs.google.com/spreadsheets/d/1gS0s1FOmcjpKYXBddejXAoJAjEZ7AdEzMU9wZc-JgY8/edit#gid=0] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9579) RLM fetch implementation by adding respective purgatory
[ https://issues.apache.org/jira/browse/KAFKA-9579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-9579: -- Fix Version/s: 3.0.0 > RLM fetch implementation by adding respective purgatory > --- > > Key: KAFKA-9579 > URL: https://issues.apache.org/jira/browse/KAFKA-9579 > Project: Kafka > Issue Type: Sub-task >Reporter: Satish Duggana >Assignee: Ying Zheng >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9578) Kafka Tiered Storage - System Tests
[ https://issues.apache.org/jira/browse/KAFKA-9578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-9578: -- Fix Version/s: 3.0.0 > Kafka Tiered Storage - System Tests > > > Key: KAFKA-9578 > URL: https://issues.apache.org/jira/browse/KAFKA-9578 > Project: Kafka > Issue Type: Sub-task >Reporter: Harsha >Assignee: Kamal Chandraprakash >Priority: Major > Fix For: 3.0.0 > > > Initial test cases set up by [~Ying Zheng] > > [https://docs.google.com/spreadsheets/d/1gS0s1FOmcjpKYXBddejXAoJAjEZ7AdEzMU9wZc-JgY8/edit#gid=0] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9990) Supporting transactions in tiered storage
[ https://issues.apache.org/jira/browse/KAFKA-9990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-9990: -- Fix Version/s: 3.0.0 > Supporting transactions in tiered storage > - > > Key: KAFKA-9990 > URL: https://issues.apache.org/jira/browse/KAFKA-9990 > Project: Kafka > Issue Type: Sub-task >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-7739) Kafka Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana reassigned KAFKA-7739: - Assignee: Satish Duggana (was: Harsha) > Kafka Tiered Storage > > > Key: KAFKA-7739 > URL: https://issues.apache.org/jira/browse/KAFKA-7739 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Harsha >Assignee: Satish Duggana >Priority: Major > Fix For: 3.0.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12641) Clear RemoteLogLeaderEpochState entry when it become empty.
[ https://issues.apache.org/jira/browse/KAFKA-12641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-12641: --- Fix Version/s: 3.0.0 > Clear RemoteLogLeaderEpochState entry when it become empty. > > > Key: KAFKA-12641 > URL: https://issues.apache.org/jira/browse/KAFKA-12641 > Project: Kafka > Issue Type: Sub-task >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > Fix For: 3.0.0 > > > https://github.com/apache/kafka/pull/10218#discussion_r609895193 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9555) Topic-based implementation for the RemoteLogMetadataManager
[ https://issues.apache.org/jira/browse/KAFKA-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-9555: -- Fix Version/s: 3.0.0 > Topic-based implementation for the RemoteLogMetadataManager > --- > > Key: KAFKA-9555 > URL: https://issues.apache.org/jira/browse/KAFKA-9555 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Alexandre Dupriez >Assignee: Satish Duggana >Priority: Major > Fix For: 3.0.0 > > > The purpose of this task is to implement a {{RemoteLogMetadataManager}} based > on an internal topic in Kafka. > Done means: > - Pull Request available for review and unit-tests. > System and integration tests are out of scope of this task and will be part > of another task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9550) RemoteLogManager implementation
[ https://issues.apache.org/jira/browse/KAFKA-9550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-9550: -- Fix Version/s: 3.0.0 > RemoteLogManager implementation > > > Key: KAFKA-9550 > URL: https://issues.apache.org/jira/browse/KAFKA-9550 > Project: Kafka > Issue Type: Sub-task >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > Fix For: 3.0.0 > > > Implementation of RLM as mentioned in the HLD section of KIP-405 > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7739) Kafka Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-7739: -- Component/s: core > Kafka Tiered Storage > > > Key: KAFKA-7739 > URL: https://issues.apache.org/jira/browse/KAFKA-7739 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Harsha >Assignee: Harsha >Priority: Major > Fix For: 3.0.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7739) Kafka Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-7739: -- Fix Version/s: 3.0.0 > Kafka Tiered Storage > > > Key: KAFKA-7739 > URL: https://issues.apache.org/jira/browse/KAFKA-7739 > Project: Kafka > Issue Type: New Feature >Reporter: Harsha >Assignee: Harsha >Priority: Major > Fix For: 3.0.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633909171 ## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ## @@ -209,136 +196,6 @@ public void testUnregisterBroker() throws Throwable { } } -static class MockSnapshotWriterBuilder implements Function { Review comment: I don't want to remove the snapshot handling code or tests that we have for the controller. We should find a way to keep this test working before we merge this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633906477 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java ## @@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) { this.epoch = epoch; } Review comment: Also, I just realized that all the data fields in here are public. Can we fix that? In Java we use public accessors, not public data fields, unless there's some really exceptional reason not to. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633905804 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -202,7 +200,7 @@ public Builder setMetrics(ControllerMetrics controllerMetrics) { @SuppressWarnings("unchecked") public QuorumController build() throws Exception { -if (logManager == null) { +if (raftClient == null) { throw new RuntimeException("You must set a metadata log manager."); Review comment: Since we're now setting a raft client rather than metadata log manager, the exception message should be updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633904989 ## File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java ## @@ -57,4 +60,9 @@ */ @Override void close(); + +static BatchReader singleton(Batch batch) { Review comment: Can you add JavaDoc for this new method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633903456 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java ## @@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) { this.epoch = epoch; } Review comment: Hmm... * What do you think of using `nodeId` rather than `leaderId`? * There should be an accessor function to get the `nodeId` out. * If we really need a shortcut method for comparing against specific node IDs like this, we could call it `LeaderAndEpoch#hasNodeId(int nodeId)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633903456 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java ## @@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) { this.epoch = epoch; } Review comment: Hmm... * `LeaderAndEpoch#leaderId` seems redundant. Any ID inside the leader object is a leader id by definition. `LeaderAndEpoch#nodeId` expresses what we mean more clearly. * There should be an accessor function to get the `nodeId` out. * If we really need a shortcut method for comparing against specific node IDs like this, we could call it `LeaderAndEpoch#hasNodeId(int nodeId)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633903456 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java ## @@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) { this.epoch = epoch; } Review comment: Hmm... * `Leader#leaderId` seems redundant. Any ID inside the leader object is a leader id by definition. `Leader#nodeId` expresses what we mean more clearly. * There should be an accessor function to get the `nodeId` out. * If we really need a shortcut method for comparing against specific node IDs like this, we could call it `LeaderAndEpoch#hasNodeId(int nodeId)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata
cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633900447 ## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala ## @@ -234,22 +259,20 @@ class BrokerMetadataListener(brokerId: Int, clientQuotaManager.handleQuotaRecord(record) } - class HandleNewLeaderEvent(leader: MetaLogLeader) + class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch) extends EventQueue.FailureLoggingEvent(log) { override def run(): Unit = { val imageBuilder = MetadataImageBuilder(brokerId, log, metadataCache.currentImage()) - if (leader.nodeId() < 0) { -imageBuilder.controllerId(None) - } else { -imageBuilder.controllerId(Some(leader.nodeId())) - } + imageBuilder.controllerId(leaderAndEpoch.leaderId.asScala) metadataCache.image(imageBuilder.build()) } } - override def handleNewLeader(leader: MetaLogLeader): Unit = { -eventQueue.append(new HandleNewLeaderEvent(leader)) + override def handleLeaderChange(leader: LeaderAndEpoch): Unit = { +if (leader.isLeader(brokerId)) { + eventQueue.append(new HandleNewLeaderEvent(leader)) Review comment: This is not correct. We need to know the controller ID even if it's not the same as our broker ID. (It's not even clear that we will always have a broker with the same ID as any controller, since their ID space is separate.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #10697: MINOR: Add @cluster annotation to StreamsNamedRepartitionTopicTest
bbejeck commented on pull request #10697: URL: https://github.com/apache/kafka/pull/10697#issuecomment-842656430 merged #10697 into trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #10697: MINOR: Add @cluster annotation to StreamsNamedRepartitionTopicTest
bbejeck merged pull request #10697: URL: https://github.com/apache/kafka/pull/10697 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores
ableegoldman commented on a change in pull request #10646: URL: https://github.com/apache/kafka/pull/10646#discussion_r633866937 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name())); } +// register the store first, so that if later an exception is thrown then eventually while we call `close` Review comment: +1 on disallowing the app to continue after an illegal exception. We need to reserve _some_ kind of exception for actual critical, fatal system errors that a user can't just ignore to spin up a new thread. And that has essentially been the meaning of these illegal exceptions in Streams thus far. As I mentioned in another thread, I've been very concerned about this in the new handler since we haven't been strict in properly cleaning up after an illegal exception -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ryannedolan commented on a change in pull request #10712: KAFKA-12798: Fixing MM2 rebalance timeout issue when source cluster is not available
ryannedolan commented on a change in pull request #10712: URL: https://github.com/apache/kafka/pull/10712#discussion_r63387 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ## @@ -198,6 +198,10 @@ + " properties to replicate."; public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class; +private static final String SOURCE_CLUSTER_START_TASK_TIMEOUT_MILLISECOND_CONFIG = "source.cluster.start.task.timeout"; +private static final String SOURCE_CLUSTER_START_TASK_TIMEOUT_MILLISECOND_DOC = "Milliseconds to wait for tasks that affects source cluster at startup"; Review comment: Can we use the existing admin.timeout.ms for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores
guozhangwang commented on a change in pull request #10646: URL: https://github.com/apache/kafka/pull/10646#discussion_r633858383 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name())); } +// register the store first, so that if later an exception is thrown then eventually while we call `close` Review comment: cc @ableegoldman @wcarlson5 @rodesai too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores
guozhangwang commented on a change in pull request #10646: URL: https://github.com/apache/kafka/pull/10646#discussion_r633858091 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name())); } +// register the store first, so that if later an exception is thrown then eventually while we call `close` Review comment: Okay, I think I got what we were discussing now. Originally I'm thinking that since these conditions should never happen --- because in the topology when we `add state stores` we already check if the store names have existed or not, and hence we should never add two stores with the same name --- if it ever happens we would always treat it as fatal and crash stop immediately. On the higher level, I think we should NOT allow users to handle illegal-s/a themselves and hence ever possibly to treat them not as fatal, but obviously today we do not enforce that. So I think we can have two options here: 1) in the lower level hierarchy like state manager here, try to stop the stores when hitting an illegal-s/a; 2) on the higher level hierarchy as in stream thread, we enforce "stop app" on illegal-s/a. I'm a bit leaning towards 2) here but would love to hear other opinions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores
ableegoldman commented on a change in pull request #10646: URL: https://github.com/apache/kafka/pull/10646#discussion_r633854799 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce } final Set changelogTopics = new HashSet<>(); -for (final StateStore stateStore : globalStateStores) { -globalStoreNames.add(stateStore.name()); +for (final StateStore stateStore : topology.globalStateStores()) { final String sourceTopic = storeToChangelogTopic.get(stateStore.name()); changelogTopics.add(sourceTopic); stateStore.init((StateStoreContext) globalProcessorContext, stateStore); Review comment: Yeah, our attitude towards IllegalStateException has been pretty cavalier thus far, and it's one of the main things I'm concerned about with the REPLACE thread functionality. We should definitely be on the lookout for possible IllegalStateException occurrences in the codebase and try to triage them so things aren't just completely screwed up if Streams is allowed to continue after hitting one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores
guozhangwang commented on a change in pull request #10646: URL: https://github.com/apache/kafka/pull/10646#discussion_r633852084 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce } final Set changelogTopics = new HashSet<>(); -for (final StateStore stateStore : globalStateStores) { -globalStoreNames.add(stateStore.name()); +for (final StateStore stateStore : topology.globalStateStores()) { Review comment: Yes, but the reason is that, in the unit test we do not really follow the trace of `stateMgr.initialize() -> store.init() -> context.registerStore() -> stateMgr.registerStore()`. That's because the `context` is a mock, which does not use the `stateMgr` at all, and hence the `stores` set is always empty. If we do want to test this call trace, then we need to make the mock context to get the actual stateMgr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12781) Improve the endOffsets accuracy in TaskMetadata
[ https://issues.apache.org/jira/browse/KAFKA-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12781: --- Affects Version/s: (was: 2.8.0) > Improve the endOffsets accuracy in TaskMetadata > > > Key: KAFKA-12781 > URL: https://issues.apache.org/jira/browse/KAFKA-12781 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Priority: Minor > Fix For: 3.0.0 > > > Currently `TaskMetadata#endOffsets()` returns the highest offset seen by the > main consumer in streams. It should be possible to get the highest offset in > the topic via the consumer instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12781) Improve the endOffsets accuracy in TaskMetadata
[ https://issues.apache.org/jira/browse/KAFKA-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346423#comment-17346423 ] A. Sophie Blee-Goldman commented on KAFKA-12781: Ah, that's my bad – I was discussing this with Walker since I had also misunderstood the meaning of the current endOffsets() API, and recommended that he use the new API from John's KIP but then took it back as I thought we had reverted the new API in that task idling KIP. But I guess we only reverted some of the changes, of course we would still have required the currentLag(). In that case I agree with Guozhang, we can just knock this out right away with a small PR to leverage the currentLag() API. I'm putting the fix version as 3.0 since it may not be critical to address this ASAP, but since it's an easy fix we should get to it by the time this is released. That will also help with the confusion around what endOffsets() means, since if both Guozhang and I misunderstood it then I'm guessing some users will too. > Improve the endOffsets accuracy in TaskMetadata > > > Key: KAFKA-12781 > URL: https://issues.apache.org/jira/browse/KAFKA-12781 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Priority: Minor > > Currently `TaskMetadata#endOffsets()` returns the highest offset seen by the > main consumer in streams. It should be possible to get the highest offset in > the topic via the consumer instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12781) Improve the endOffsets accuracy in TaskMetadata
[ https://issues.apache.org/jira/browse/KAFKA-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12781: --- Fix Version/s: 3.0.0 > Improve the endOffsets accuracy in TaskMetadata > > > Key: KAFKA-12781 > URL: https://issues.apache.org/jira/browse/KAFKA-12781 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Priority: Minor > Fix For: 3.0.0 > > > Currently `TaskMetadata#endOffsets()` returns the highest offset seen by the > main consumer in streams. It should be possible to get the highest offset in > the topic via the consumer instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] bbejeck commented on pull request #10697: MINOR: Add @cluster annotation to StreamsNamedRepartitionTopicTest
bbejeck commented on pull request #10697: URL: https://github.com/apache/kafka/pull/10697#issuecomment-842616336 This PR only touched a python system test file, failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10697: MINOR: Add @cluster annotation to StreamsNamedRepartitionTopicTest
rondagostino commented on pull request #10697: URL: https://github.com/apache/kafka/pull/10697#issuecomment-842602694 Thanks, @bbejeck . Test passed: https://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-05-17--001.1621281781--rondagostino--systest_add_cluster_annotation_streams_test--b20e203a0/report.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #10697: MINOR: Add @cluster annotation to StreamsNamedRepartitionTopicTest
bbejeck commented on pull request #10697: URL: https://github.com/apache/kafka/pull/10697#issuecomment-842583178 @rondagostino can we run a branch builder for this PR as a sanity check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences
gharris1727 commented on a change in pull request #8259: URL: https://github.com/apache/kafka/pull/8259#discussion_r633794872 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java ## @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import static org.junit.Assert.fail; + +import java.lang.management.LockInfo; +import java.lang.management.ManagementFactory; +import java.lang.management.MonitorInfo; +import java.lang.management.ThreadInfo; +import java.net.URL; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SynchronizationTest { + +public static final Logger log = LoggerFactory.getLogger(SynchronizationTest.class); + +@Rule +public final TestName testName = new TestName(); + +private String threadPrefix; +private Plugins plugins; +private ThreadPoolExecutor exec; +private Breakpoint dclBreakpoint; +private Breakpoint pclBreakpoint; + +@Before +public void setup() { +TestPlugins.assertAvailable(); +Map pluginProps = Collections.singletonMap( +WorkerConfig.PLUGIN_PATH_CONFIG, +String.join(",", TestPlugins.pluginPath()) +); +threadPrefix = SynchronizationTest.class.getSimpleName() ++ "." + testName.getMethodName() + "-"; +dclBreakpoint = new Breakpoint<>(); +pclBreakpoint = new Breakpoint<>(); +plugins = new Plugins(pluginProps) { +@Override +protected DelegatingClassLoader newDelegatingClassLoader(List paths) { +return AccessController.doPrivileged( +(PrivilegedAction) () -> +new SynchronizedDelegatingClassLoader(paths) +); +} +}; +exec = new ThreadPoolExecutor( +2, +2, +1000L, +TimeUnit.MILLISECONDS, +new LinkedBlockingDeque<>(), +threadFactoryWithNamedThreads(threadPrefix) +); + +} + +@After +public void tearDown() throws InterruptedException { +dclBreakpoint.clear(); +pclBreakpoint.clear(); +exec.shutdown(); +exec.awaitTermination(1L, TimeUnit.SECONDS); +} + +private static class Breakpoint { + +private Predicate predicate; +private CyclicBarrier barrier; + +public synchronized void clear() { +if (barrier != null) { +barrier.reset(); +} +predicate = null; +barrier = null; +} + +public synchronized void set(Predicate predicate) { +clear(); +this.predicate = predicate; +// As soon as the barrier is tripped, the barrier will be reset for the next round. +barrier = new CyclicBarrier(2); +} + +/** + * From a thread under test, await for the test orchestrator to continue execution + * @param obj Object to test
[GitHub] [kafka] gharris1727 commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences
gharris1727 commented on a change in pull request #8259: URL: https://github.com/apache/kafka/pull/8259#discussion_r633780467 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java ## @@ -48,25 +48,25 @@ import java.util.Arrays; import java.util.Collection; import java.util.Enumeration; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.ServiceLoader; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; public class DelegatingClassLoader extends URLClassLoader { private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class); private static final String CLASSPATH_NAME = "classpath"; private static final String UNDEFINED_VERSION = "undefined"; -private final Map, ClassLoader>> pluginLoaders; -private final Map aliases; +private final ConcurrentMap, ClassLoader>> pluginLoaders; +private final ConcurrentMap aliases; Review comment: This was added by @kkonstantine but it does seem to be non-functional. I'll revert the type change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences
rhauch commented on a change in pull request #8259: URL: https://github.com/apache/kafka/pull/8259#discussion_r633778524 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java ## @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import static org.junit.Assert.fail; + +import java.lang.management.LockInfo; +import java.lang.management.ManagementFactory; +import java.lang.management.MonitorInfo; +import java.lang.management.ThreadInfo; +import java.net.URL; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SynchronizationTest { + +public static final Logger log = LoggerFactory.getLogger(SynchronizationTest.class); + +@Rule +public final TestName testName = new TestName(); + +private String threadPrefix; +private Plugins plugins; +private ThreadPoolExecutor exec; +private Breakpoint dclBreakpoint; +private Breakpoint pclBreakpoint; + +@Before +public void setup() { +TestPlugins.assertAvailable(); +Map pluginProps = Collections.singletonMap( +WorkerConfig.PLUGIN_PATH_CONFIG, +String.join(",", TestPlugins.pluginPath()) +); +threadPrefix = SynchronizationTest.class.getSimpleName() ++ "." + testName.getMethodName() + "-"; +dclBreakpoint = new Breakpoint<>(); +pclBreakpoint = new Breakpoint<>(); +plugins = new Plugins(pluginProps) { +@Override +protected DelegatingClassLoader newDelegatingClassLoader(List paths) { +return AccessController.doPrivileged( +(PrivilegedAction) () -> +new SynchronizedDelegatingClassLoader(paths) +); +} +}; +exec = new ThreadPoolExecutor( +2, +2, +1000L, +TimeUnit.MILLISECONDS, +new LinkedBlockingDeque<>(), +threadFactoryWithNamedThreads(threadPrefix) +); + +} + +@After +public void tearDown() throws InterruptedException { +dclBreakpoint.clear(); +pclBreakpoint.clear(); +exec.shutdown(); +exec.awaitTermination(1L, TimeUnit.SECONDS); +} + +private static class Breakpoint { + +private Predicate predicate; +private CyclicBarrier barrier; + +public synchronized void clear() { +if (barrier != null) { +barrier.reset(); +} +predicate = null; +barrier = null; +} + +public synchronized void set(Predicate predicate) { +clear(); +this.predicate = predicate; +// As soon as the barrier is tripped, the barrier will be reset for the next round. +barrier = new CyclicBarrier(2); +} + +/** + * From a thread under test, await for the test orchestrator to continue execution + * @param obj Object to test with
[GitHub] [kafka] cmccabe commented on pull request #10696: KAFKA-12777 Refactor and cleanup AutoTopicCreationManager
cmccabe commented on pull request #10696: URL: https://github.com/apache/kafka/pull/10696#issuecomment-842547338 (Copying offline discussion to github) I think creating a base class and using implementation inheritance is worse than the current code. Looking at the code, I see very little commonality between the ZK implementation and the API-based implementation. I think we should just have an interface and two separate implementations. The main code that we'd want to share between the two implementations is the routine that checks if topics have the wrong names. That's a very small amount of code which could simply be in a static method. If you want you could combine that with checking if a change is "in-flight" by passing in a map from topics to booleans, or similar. Or just have two separate maps in the separate implementations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10572: KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller
cmccabe commented on a change in pull request #10572: URL: https://github.com/apache/kafka/pull/10572#discussion_r633769533 ## File path: metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java ## @@ -139,10 +140,16 @@ public TopicIdPartition next() { * Partitions with no isr members appear in this map under id NO_LEADER. */ private final TimelineHashMap> isrMembers; + +private final Map offlinePartitionCounts; Review comment: All the information that is needed is already here. ## File path: metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java ## @@ -139,10 +140,16 @@ public TopicIdPartition next() { * Partitions with no isr members appear in this map under id NO_LEADER. */ private final TimelineHashMap> isrMembers; + +private final Map offlinePartitionCounts; Review comment: All the information that is needed is already here. If you delete X partitions that had a leader of -1, you decrement the counter by X. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12419) Remove Deprecated APIs of Kafka Streams in 3.0
[ https://issues.apache.org/jira/browse/KAFKA-12419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346378#comment-17346378 ] Josep Prat commented on KAFKA-12419: Tomorrow I'll create the tickets for the Kafka Streams module, I'll do one per method / class and do my best to find out if the tickets already exist > Remove Deprecated APIs of Kafka Streams in 3.0 > -- > > Key: KAFKA-12419 > URL: https://issues.apache.org/jira/browse/KAFKA-12419 > Project: Kafka > Issue Type: Improvement > Components: streams, streams-test-utils >Reporter: Guozhang Wang >Assignee: Tomasz Nguyen >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > Here's a list of deprecated APIs that we have accumulated in the past, we can > consider removing them in 3.0: > * KIP-198: "--zookeeper" flag from StreamsResetter (1.0) > * KIP-171: "–execute" flag from StreamsResetter (1.1) > * KIP-233: overloaded "StreamsBuilder#addGlobalStore" (1.1) > * KIP-251: overloaded "ProcessorContext#forward" (2.0) > * KIP-276: "StreamsConfig#getConsumerConfig" (2.0) > * KIP-319: "WindowBytesStoreSupplier#segments" (2.1) > * KIP-321: "TopologyDescription.Source#topics" (2.1) > * KIP-328: "Windows#until/segmentInterval/maintainMS" (2.1) > * KIP-358: "Windows/Materialized" overloaded functions with `long` (2.1) > * KIP-365/366: Implicit Scala Apis (2.1) > * KIP-372: overloaded "KStream#groupBy" (2.1) > * KIP-307: "Joined#named" (2.3) > * KIP-345: Broker config "group.initial.rebalance.delay.ms" (2.3) > * KIP-429: "PartitionAssignor" interface (2.4) > * KIP-470: "TopologyTestDriver#pipeInput" (2.4) > * KIP-476: overloaded "KafkaClientSupplier#getAdminClient" (2.4) > * KIP-479: overloaded "KStream#join" (2.4) > * KIP-530: old "UsePreviousTimeOnInvalidTimeStamp" (2.5) > * KIP-535 / 562: overloaded "KafkaStreams#metadataForKey" and > "KafkaStreams#store" (2.5) > And here's a list of already filed JIRAs for removing deprecated APIs > * KAFKA-10434 > * KAFKA-7785 > * KAFKA-12796 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram merged pull request #10633: KAFKA-12751: Reset AlterIsr in-flight state for duplicate update requests
rajinisivaram merged pull request #10633: URL: https://github.com/apache/kafka/pull/10633 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #10561: KAFKA-12686 AlterIsr and LeaderAndIsr race condition
mumrah commented on pull request #10561: URL: https://github.com/apache/kafka/pull/10561#issuecomment-842540233 @chia7712 or @cmccabe let me know if you have anymore feedback or questions. I'd like to get this one merged -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #10633: KAFKA-12751: Reset AlterIsr in-flight state for duplicate update requests
rajinisivaram commented on pull request #10633: URL: https://github.com/apache/kafka/pull/10633#issuecomment-842540743 @mumrah Thanks for the review. Test failures not related, merging to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft
cmccabe commented on a change in pull request #10550: URL: https://github.com/apache/kafka/pull/10550#discussion_r633749112 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -99,6 +100,9 @@ class ControllerApis(val requestChannel: RequestChannel, case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request) case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request) +case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request) Review comment: We might want the controller to process DescribeAcls for debug purposes. There's no reason to artificially disable it from processing the RPC, although I agree that it will normally not be used. ## File path: clients/src/main/resources/common/message/DeleteAclsRequest.json ## @@ -16,7 +16,7 @@ { "apiKey": 31, "type": "request", - "listeners": ["zkBroker"], + "listeners": ["zkBroker", "broker", "controller"], Review comment: (See above comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #10714: MINOR: add ConfigUtils method for printing configurations
cmccabe opened a new pull request #10714: URL: https://github.com/apache/kafka/pull/10714 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic
ableegoldman commented on pull request #10675: URL: https://github.com/apache/kafka/pull/10675#issuecomment-842500319 One unrelated failure in RaftClusterTest, otherwise tests passed. Merging to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic
guozhangwang commented on a change in pull request #10675: URL: https://github.com/apache/kafka/pull/10675#discussion_r633174817 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java ## @@ -237,13 +237,15 @@ private static boolean isRecoverable(final KafkaException uncaughtException) { * @throws TaskMigratedException */ protected void commitTransaction(final Map offsets, - final ConsumerGroupMetadata consumerGroupMetadata) { + final ConsumerGroupMetadata consumerGroupMetadata) { if (!eosEnabled()) { throw new IllegalStateException(formatException("Exactly-once is not enabled")); } maybeBeginTransaction(); try { -producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata); +// Older brokers don't understand any group metadata beyond the group id, thus we must downgrade the request for eos-v1 Review comment: nit: maybe clarify a bit better here, e.g. "EXACTLY_ONCE_V2 on the Streams config assumes that broker is at least 2.5 and hence understand ... ; if it is smaller than V2, then the brokers may be on older versions and hence ..." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #10694: MINOR: fix system test TestSecurityRollingUpgrade
mumrah commented on pull request #10694: URL: https://github.com/apache/kafka/pull/10694#issuecomment-842509752 Did a `--repeat 4` run of `security_rolling_upgrade_test.py` and everything passed https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4514/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10707: KAFKA-12792: Fix metrics bug and introduce TimelineInteger
cmccabe commented on pull request #10707: URL: https://github.com/apache/kafka/pull/10707#issuecomment-842490006 Thanks, @mumrah . > Should we include a comment about the thread safety of these timeline classes? Yeah. I added a comment about the classes requiring external synchronization -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #10709: KAFKA-12794: Fix trailing json tokens in DescribeProducersRequest.json
dongjinleekr commented on pull request #10709: URL: https://github.com/apache/kafka/pull/10709#issuecomment-842297201 Nice Catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] NLincoln commented on pull request #10709: KAFKA-12794: Fix trailing json tokens in DescribeProducersRequest.json
NLincoln commented on pull request #10709: URL: https://github.com/apache/kafka/pull/10709#issuecomment-842404631 @dajac done! https://issues.apache.org/jira/browse/KAFKA-12800 I can't edit the assignee field of that ticket, but I can submit a patch for it tonight :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.
ableegoldman merged pull request #10542: URL: https://github.com/apache/kafka/pull/10542 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10710: KAFKA-12796: Removal of deprecated classes under `streams-scala`
jlprat commented on pull request #10710: URL: https://github.com/apache/kafka/pull/10710#issuecomment-842288466 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10711: MINOR: Update Scala to 2.13.6
jlprat commented on pull request #10711: URL: https://github.com/apache/kafka/pull/10711#issuecomment-842513321 PR was created 5 hours ago and some jenkins jobs are still running, maybe there has been a problem with those jobs. Anyway, the failure reported is one of the known flaky tests (RaftClusterTest) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #10709: KAFKA-12794: Fix trailing json tokens in DescribeProducersRequest.json
dajac merged pull request #10709: URL: https://github.com/apache/kafka/pull/10709 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #10694: MINOR: fix system test TestSecurityRollingUpgrade
mumrah merged pull request #10694: URL: https://github.com/apache/kafka/pull/10694 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft
rondagostino commented on a change in pull request #10550: URL: https://github.com/apache/kafka/pull/10550#discussion_r633107724 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -99,6 +100,9 @@ class ControllerApis(val requestChannel: RequestChannel, case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request) case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request) +case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request) Review comment: When would the controller ever process `DescribeAcls`? It seems to me that it never would because brokers don't forward that request. And if that is correct, I think my change to `DescribeAclsRequest.json` to set `"listeners": ["zkBroker", "broker", "controller"],` should instead be setting it to `"listeners": ["zkBroker", "broker"],`. ## File path: clients/src/main/resources/common/message/DeleteAclsRequest.json ## @@ -16,7 +16,7 @@ { "apiKey": 31, "type": "request", - "listeners": ["zkBroker"], + "listeners": ["zkBroker", "broker", "controller"], Review comment: As mentioned, I am wondering if this should be `listeners": ["zkBroker", "broker"],` since the KRaft broker never forward DescribeAcls to the controller. ## File path: tests/kafkatest/tests/core/zookeeper_authorizer_test.py ## @@ -0,0 +1,98 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.cluster.remoteaccount import RemoteCommandError +from ducktape.mark import matrix +from ducktape.mark.resource import cluster +from ducktape.tests.test import Test + +from kafkatest.services.kafka import KafkaService, quorum +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.security.kafka_acls import ACLs + +class ZooKeeperAuthorizerTest(Test): Review comment: The only two system tests that leverage an authorizer are `ZooKeeperSecurityUpgradeTest` and `TestSecurityRollingUpgrade`. The first is specific to ZooKeeper and does not apply to KRaft at all. The second could be applied to KRaft because it tests changing the inter-broker security protocol across rolls rather than any broker version upgrade, but it has not yet been converted to the KRaft case, plus it does not remove ACLs (which we also want to test here). Also, the authorizer piece at the end is not really the main part of that test. It felt appropriate to create this simple, straightforward test for the specific use of the ZooKeeper-based authorizer. I've added a comment to this test (assuming we keep it) stating that we will test the KRaft replacement authorizer separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer
ableegoldman commented on pull request #10690: URL: https://github.com/apache/kafka/pull/10690#issuecomment-842508231 Just the one flaky test `org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable()` which Luke is going to look into. This should be ready to merge if there are no further suggestions @ijuma @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10694: MINOR: fix system test TestSecurityRollingUpgrade
rondagostino commented on pull request #10694: URL: https://github.com/apache/kafka/pull/10694#issuecomment-842403412 Thanks, for the review @mumrah! Regarding your question: > Looks like we're checking client security protocol + sasl mechanism as well as the inter broker protocol + sasl mechanism. Do we need to do the same for "intercontroller_security_protocol" in KRaft mode? The answer is yes -- it's a good point. I've opened https://issues.apache.org/jira/browse/KAFKA-12799 to extend the existing tests to apply to KRaft controllers, and I indicated in that ticket that we will have to take security config mutations into account for that implementation as we did here. I hope it's okay that we defer this to when we get to that ticket. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] NLincoln commented on a change in pull request #10709: KAFKA-12794: Fix trailing json tokens in DescribeProducersRequest.json
NLincoln commented on a change in pull request #10709: URL: https://github.com/apache/kafka/pull/10709#discussion_r633535873 ## File path: clients/src/main/resources/common/message/DescribeProducersRequest.json ## @@ -27,6 +27,5 @@ { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+", "about": "The indexes of the partitions to list producers for." } ]} -]} Review comment: 1e9d680e4225de93169cb357147723d7527b3503 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic
ableegoldman merged pull request #10675: URL: https://github.com/apache/kafka/pull/10675 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #10708: MINOR: remove unnecessary `public` keyword from `Partitioner` interface
mjsax merged pull request #10708: URL: https://github.com/apache/kafka/pull/10708 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org