[jira] [Created] (KAFKA-14695) broker will get LEADER_AND_ISR is not enabled error while ZK migrating to KRaft
Luke Chen created KAFKA-14695: - Summary: broker will get LEADER_AND_ISR is not enabled error while ZK migrating to KRaft Key: KAFKA-14695 URL: https://issues.apache.org/jira/browse/KAFKA-14695 Project: Kafka Issue Type: Bug Components: kraft Affects Versions: 3.4.0 Reporter: Luke Chen Assignee: Luke Chen Following the docs here: [https://kafka.apache.org/documentation/#kraft_zk_migration] During the step "Migrating brokers to KRaft", after migrating, the broker will get a lot of error: {code:java} org.apache.kafka.common.errors.InvalidRequestException: Received request api key LEADER_AND_ISR which is not enabled [2023-02-08 12:06:25,776] ERROR Exception while processing request from 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) {code} It blocks further migration. This can be workaround by explicitly setting the listener host name. That is, instead of setting listeners value like this (also the default value) _listeners=PLAINTEXT://:9092_ It should be set as: _listeners=PLAINTEXT://localhost:9092_ Will update the doc first, to unblock other users trying to migrate from ZK to Kraft, then investigating why this happened. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
satishd commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1101000469 ## core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java: ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import kafka.cluster.Partition; +import kafka.log.LeaderOffsetIncremented$; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + The replica fetcher tier state machine follows a state machine progression. + + Currently, the tier state machine follows a synchronous execution, and we only need to start the machine. + There is no need to advance the state. + + When started, the tier state machine will fetch the local log start offset of the + leader and then build the follower's remote log aux state until the leader's + local log start offset. + */ +public class ReplicaFetcherTierStateMachine implements TierStateMachine { +private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class); Review Comment: This is always a static field unless it is loaded with LogContext. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
satishd commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1100998652 ## core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java: ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import kafka.cluster.Partition; +import kafka.log.LeaderOffsetIncremented$; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + The replica fetcher tier state machine follows a state machine progression. + + Currently, the tier state machine follows a synchronous execution, and we only need to start the machine. + There is no need to advance the state. + + When started, the tier state machine will fetch the local log start offset of the + leader and then build the follower's remote log aux state until the leader's + local log start offset. + */ +public class ReplicaFetcherTierStateMachine implements TierStateMachine { +private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class); Review Comment: No, this is always a static field unless it is loaded with `LogContext`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation
yashmayya commented on code in PR #13184: URL: https://github.com/apache/kafka/pull/13184#discussion_r1099951075 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java: ## @@ -270,28 +270,28 @@ public boolean includeRecordDetailsInErrorLog() { /** * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}. */ -public > List> transformations() { +public > List> transformations() { final List transformAliases = getList(TRANSFORMS_CONFIG); -final List> transformations = new ArrayList<>(transformAliases.size()); +final List> transformations = new ArrayList<>(transformAliases.size()); for (String alias : transformAliases) { final String prefix = TRANSFORMS_CONFIG + "." + alias + "."; try { @SuppressWarnings("unchecked") final Transformation transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class); Map configs = originalsWithPrefix(prefix); -Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG); -Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG); +Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG); +Object negate = configs.remove(TransformationStage.NEGATE_CONFIG); transformation.configure(configs); if (predicateAlias != null) { Review Comment: The default value for `PREDICATE_CONFIG` is `""` and not `null`; it looks like this check currently works fine because we're using the original configs and not the parsed configs. I wonder if we should update the default value and / or this check here to be consistent? Edit: This isn't really related to your change; just thought I'd bring it up since we were in the area. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
mjsax commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1100936116 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This + * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may + * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during + * restoration. + * + * The structure of the internals of this write buffer mirrors the structure of the + * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the + * segment stores is buffered in a separate object -- specifically, a map. + */ +public class RocksDBVersionedStoreRestoreWriteBuffer { + +private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class); + +// write buffer for latest value store. value type is Optional in order to track tombstones +// which must be written to the underlying store. +private final Map> latestValueWriteBuffer; +// map from segment id to write buffer. segments are stored in reverse-sorted order, +// so getReverseSegments() is more efficient +private final TreeMap segmentsWriteBuffer; +private final BatchWritingVersionedStoreClient dbClient; +private final RocksDBVersionedStoreRestoreClient restoreClient; + +/** + * Creates a new write buffer. + * @param dbClient client for reading from and writing to the underlying persistent store + */ +RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient dbClient) { +this.dbClient = Objects.requireNonNull(dbClient); + +this.latestValueWriteBuffer = new HashMap<>(); +// store in reverse-sorted order, to make getReverseSegments() more efficient +this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x)); +this.restoreClient = new RocksDBVersionedStoreRestoreClient(); +} + +/** + * @return client for writing to (and reading from) the write buffer + */ +VersionedStoreClient getClient() { Review Comment: Why not typed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
mjsax commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1100927416 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -273,7 +290,41 @@ public void init(final StateStoreContext context, final StateStore root) { // VisibleForTesting void restoreBatch(final Collection> records) { -throw new UnsupportedOperationException("not yet implemented"); +// advance stream time to the max timestamp in the batch +for (final ConsumerRecord record : records) { +observedStreamTime = Math.max(observedStreamTime, record.timestamp()); +} + +final VersionedStoreClient restoreClient = restoreWriteBuffer.getClient(); Review Comment: Why is the generic not typed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
mjsax commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1100920982 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This + * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may + * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during + * restoration. + * + * The structure of the internals of this write buffer mirrors the structure of the + * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the + * segment stores is buffered in a separate object -- specifically, a map. + */ +public class RocksDBVersionedStoreRestoreWriteBuffer { + +private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class); + +// write buffer for latest value store. value type is Optional in order to track tombstones +// which must be written to the underlying store. +private final Map> latestValueWriteBuffer; +// map from segment id to write buffer. segments are stored in reverse-sorted order, +// so getReverseSegments() is more efficient +private final TreeMap segmentsWriteBuffer; +private final BatchWritingVersionedStoreClient dbClient; +private final RocksDBVersionedStoreRestoreClient restoreClient; + +/** + * Creates a new write buffer. + * @param dbClient client for reading from and writing to the underlying persistent store + */ +RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient dbClient) { +this.dbClient = Objects.requireNonNull(dbClient); + +this.latestValueWriteBuffer = new HashMap<>(); +// store in reverse-sorted order, to make getReverseSegments() more efficient +this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x)); +this.restoreClient = new RocksDBVersionedStoreRestoreClient(); +} + +/** + * @return client for writing to (and reading from) the write buffer + */ +VersionedStoreClient getClient() { +return restoreClient; +} + +/** + * Flushes the contents of the write buffer into the persistent store, and clears the write + * buffer in the process. + * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch} + */ +void flush() throws RocksDBException { + +// flush segments first, as this is consistent with the store always writing to +// older segments/stores before later ones +try (final WriteBatch segmentsBatch = new WriteBatch()) { +final List allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE); +if (allSegments.size() > 0) { +// collect entries into write batch +for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) { +final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment(); +for (final Map.Entry segmentEntry : bufferSegment.getAll()) { +
[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
mjsax commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1100920384 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This + * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may + * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during + * restoration. + * + * The structure of the internals of this write buffer mirrors the structure of the + * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the + * segment stores is buffered in a separate object -- specifically, a map. + */ +public class RocksDBVersionedStoreRestoreWriteBuffer { + +private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class); + +// write buffer for latest value store. value type is Optional in order to track tombstones +// which must be written to the underlying store. +private final Map> latestValueWriteBuffer; +// map from segment id to write buffer. segments are stored in reverse-sorted order, +// so getReverseSegments() is more efficient +private final TreeMap segmentsWriteBuffer; +private final BatchWritingVersionedStoreClient dbClient; +private final RocksDBVersionedStoreRestoreClient restoreClient; + +/** + * Creates a new write buffer. + * @param dbClient client for reading from and writing to the underlying persistent store + */ +RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient dbClient) { +this.dbClient = Objects.requireNonNull(dbClient); + +this.latestValueWriteBuffer = new HashMap<>(); +// store in reverse-sorted order, to make getReverseSegments() more efficient +this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x)); +this.restoreClient = new RocksDBVersionedStoreRestoreClient(); +} + +/** + * @return client for writing to (and reading from) the write buffer + */ +VersionedStoreClient getClient() { +return restoreClient; +} + +/** + * Flushes the contents of the write buffer into the persistent store, and clears the write + * buffer in the process. + * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch} + */ +void flush() throws RocksDBException { + +// flush segments first, as this is consistent with the store always writing to +// older segments/stores before later ones +try (final WriteBatch segmentsBatch = new WriteBatch()) { +final List allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE); +if (allSegments.size() > 0) { +// collect entries into write batch +for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) { +final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment(); +for (final Map.Entry segmentEntry : bufferSegment.getAll()) { +
[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
mjsax commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1100920025 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This + * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may + * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during + * restoration. + * + * The structure of the internals of this write buffer mirrors the structure of the + * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the + * segment stores is buffered in a separate object -- specifically, a map. + */ +public class RocksDBVersionedStoreRestoreWriteBuffer { + +private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class); + +// write buffer for latest value store. value type is Optional in order to track tombstones +// which must be written to the underlying store. +private final Map> latestValueWriteBuffer; +// map from segment id to write buffer. segments are stored in reverse-sorted order, +// so getReverseSegments() is more efficient +private final TreeMap segmentsWriteBuffer; +private final BatchWritingVersionedStoreClient dbClient; +private final RocksDBVersionedStoreRestoreClient restoreClient; + +/** + * Creates a new write buffer. + * @param dbClient client for reading from and writing to the underlying persistent store + */ +RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient dbClient) { +this.dbClient = Objects.requireNonNull(dbClient); + +this.latestValueWriteBuffer = new HashMap<>(); +// store in reverse-sorted order, to make getReverseSegments() more efficient +this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x)); +this.restoreClient = new RocksDBVersionedStoreRestoreClient(); +} + +/** + * @return client for writing to (and reading from) the write buffer + */ +VersionedStoreClient getClient() { +return restoreClient; +} + +/** + * Flushes the contents of the write buffer into the persistent store, and clears the write + * buffer in the process. + * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch} + */ +void flush() throws RocksDBException { + +// flush segments first, as this is consistent with the store always writing to +// older segments/stores before later ones +try (final WriteBatch segmentsBatch = new WriteBatch()) { +final List allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE); +if (allSegments.size() > 0) { +// collect entries into write batch +for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) { +final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment(); +for (final Map.Entry segmentEntry : bufferSegment.getAll()) { +
[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
mjsax commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1100910560 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This + * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may + * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during + * restoration. + * + * The structure of the internals of this write buffer mirrors the structure of the + * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the + * segment stores is buffered in a separate object -- specifically, a map. + */ +public class RocksDBVersionedStoreRestoreWriteBuffer { + +private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class); + +// write buffer for latest value store. value type is Optional in order to track tombstones +// which must be written to the underlying store. +private final Map> latestValueWriteBuffer; +// map from segment id to write buffer. segments are stored in reverse-sorted order, +// so getReverseSegments() is more efficient +private final TreeMap segmentsWriteBuffer; +private final BatchWritingVersionedStoreClient dbClient; +private final RocksDBVersionedStoreRestoreClient restoreClient; + +/** + * Creates a new write buffer. + * @param dbClient client for reading from and writing to the underlying persistent store + */ +RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient dbClient) { +this.dbClient = Objects.requireNonNull(dbClient); + +this.latestValueWriteBuffer = new HashMap<>(); +// store in reverse-sorted order, to make getReverseSegments() more efficient +this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x)); +this.restoreClient = new RocksDBVersionedStoreRestoreClient(); +} + +/** + * @return client for writing to (and reading from) the write buffer + */ +VersionedStoreClient getClient() { +return restoreClient; +} + +/** + * Flushes the contents of the write buffer into the persistent store, and clears the write + * buffer in the process. + * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch} + */ +void flush() throws RocksDBException { + +// flush segments first, as this is consistent with the store always writing to +// older segments/stores before later ones +try (final WriteBatch segmentsBatch = new WriteBatch()) { +final List allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE); +if (allSegments.size() > 0) { +// collect entries into write batch +for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) { +final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment(); +for (final Map.Entry segmentEntry : bufferSegment.getAll()) { +
[GitHub] [kafka] showuon merged pull request #13221: MINOR: Fix hyperlink tags in upgrade docs
showuon merged PR #13221: URL: https://github.com/apache/kafka/pull/13221 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
mjsax commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1100908054 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -333,10 +384,34 @@ interface VersionedStoreSegment { long segmentIdForTimestamp(long timestamp); } +/** + * A {@link VersionedStoreClient} which additionally supports batch writes into its latest + * value store. + * + * @param the segment type used by this client + */ +interface BatchWritingVersionedStoreClient extends VersionedStoreClient { Review Comment: Why do we need this interface? Can't we just add both methods to `RocksDBVersionedStoreClient` directly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
mjsax commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1100902144 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -0,0 +1,877 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; +import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; +import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult; +import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A persistent, versioned key-value store based on RocksDB. + * + * This store implementation consists of a "latest value store" and "segment stores." The latest + * record version for each key is stored in the latest value store, while older record versions + * are stored in the segment stores. Conceptually, each record version has two associated + * timestamps: + * + * a {@code validFrom} timestamp. This timestamp is explicitly associated with the record + * as part of the {@link VersionedKeyValueStore#put(Object, Object, long)}} call to the store; + * i.e., this is the record's timestamp. + * a {@code validTo} timestamp. This is the timestamp of the next record (or deletion) + * associated with the same key, and is implicitly associated with the record. This timestamp + * can change as new records are inserted into the store. + * + * The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and + * can change as new record versions are inserted into the store (and validTo changes as a result). + * + * Old record versions are stored in segment stores according to their validTo timestamps. This + * allows for efficient expiry of old record versions, as entire segments can be dropped from the + * store at a time, once the records contained in the segment are no longer relevant based on the + * store's history retention (for an explanation of "history retention", see + * {@link VersionedKeyValueStore}). Multiple record versions (for the same key) within a single + * segment are stored together using the format specified in {@link RocksDBVersionedStoreSegmentValueFormatter}. + */ +public class RocksDBVersionedStore implements VersionedKeyValueStore { +private static final Logger LOG = LoggerFactory.getLogger(RocksDBVersionedStore.class); +// a marker to indicate that no record version has yet been found as part of an ongoing +// put() procedure. any value which is not a valid record timestamp will do. +private
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1100896272 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -0,0 +1,792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; +import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; +import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult; +import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A persistent, versioned key-value store based on RocksDB. + * + * This store implementation consists of a "latest value store" and "segment stores." The latest + * record version for each key is stored in the latest value store, while older record versions + * are stored in the segment stores. Conceptually, each record version has two associated + * timestamps: + * + * a {@code validFrom} timestamp. This timestamp is explicitly associated with the record + * as part of the {@link VersionedKeyValueStore#put(Object, Object, long)}} call to the store; + * i.e., this is the record's timestamp. + * a {@code validTo} timestamp. This is the timestamp of the next record (or deletion) + * associated with the same key, and is implicitly associated with the record. This timestamp + * can change as new records are inserted into the store. + * + * The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and + * can change as new record versions are inserted into the store (and validTo changes as a result). + * + * Old record versions are stored in segment stores according to their validTo timestamps. This + * allows for efficient expiry of old record versions, as entire segments can be dropped from the + * store at a time, once the records contained in the segment are no longer relevant based on the + * store's history retention (for an explanation of "history retention", see + * {@link VersionedKeyValueStore}). Multiple record versions (for the same key) within a single + * segment are stored together using the format specified in {@link RocksDBVersionedStoreSegmentValueFormatter}. + */ +public class RocksDBVersionedStore implements VersionedKeyValueStore { +private static final Logger LOG = LoggerFactory.getLogger(RocksDBVersionedStore.class); +// a marker to indicate that no record version has yet been found as part of an ongoing +// put() procedure. any value which is not a valid record timestamp will do. +private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE; + +private final String name; +private final long historyRetention; +private final RocksDBMetricsRecorder metricsRecorder; + +private final RocksDBStore latestValueStore; +private final LogicalKeyValueSegments segmentStores; +private final LatestValueSchema latestValueSchema; +private final SegmentValueSchema segmentValueSchema; +private final
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1100893004 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -0,0 +1,792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; +import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; +import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult; +import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A persistent, versioned key-value store based on RocksDB. + * + * This store implementation consists of a "latest value store" and "segment stores." The latest + * record version for each key is stored in the latest value store, while older record versions + * are stored in the segment stores. Conceptually, each record version has two associated + * timestamps: + * + * a {@code validFrom} timestamp. This timestamp is explicitly associated with the record + * as part of the {@link VersionedKeyValueStore#put(Object, Object, long)}} call to the store; + * i.e., this is the record's timestamp. + * a {@code validTo} timestamp. This is the timestamp of the next record (or deletion) + * associated with the same key, and is implicitly associated with the record. This timestamp + * can change as new records are inserted into the store. + * + * The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and + * can change as new record versions are inserted into the store (and validTo changes as a result). + * + * Old record versions are stored in segment stores according to their validTo timestamps. This + * allows for efficient expiry of old record versions, as entire segments can be dropped from the + * store at a time, once the records contained in the segment are no longer relevant based on the + * store's history retention (for an explanation of "history retention", see + * {@link VersionedKeyValueStore}). Multiple record versions (for the same key) within a single + * segment are stored together using the format specified in {@link RocksDBVersionedStoreSegmentValueFormatter}. + */ +public class RocksDBVersionedStore implements VersionedKeyValueStore { +private static final Logger LOG = LoggerFactory.getLogger(RocksDBVersionedStore.class); +// a marker to indicate that no record version has yet been found as part of an ongoing +// put() procedure. any value which is not a valid record timestamp will do. +private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE; + +private final String name; +private final long historyRetention; +private final RocksDBMetricsRecorder metricsRecorder; + +private final RocksDBStore latestValueStore; +private final LogicalKeyValueSegments segmentStores; +private final LatestValueSchema latestValueSchema; +private final SegmentValueSchema segmentValueSchema; +private final
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1100892271 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -0,0 +1,792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; +import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; +import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult; +import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A persistent, versioned key-value store based on RocksDB. + * + * This store implementation consists of a "latest value store" and "segment stores." The latest + * record version for each key is stored in the latest value store, while older record versions + * are stored in the segment stores. Conceptually, each record version has two associated + * timestamps: + * + * a {@code validFrom} timestamp. This timestamp is explicitly associated with the record + * as part of the {@link VersionedKeyValueStore#put(Object, Object, long)}} call to the store; + * i.e., this is the record's timestamp. + * a {@code validTo} timestamp. This is the timestamp of the next record (or deletion) + * associated with the same key, and is implicitly associated with the record. This timestamp + * can change as new records are inserted into the store. + * + * The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and + * can change as new record versions are inserted into the store (and validTo changes as a result). + * + * Old record versions are stored in segment stores according to their validTo timestamps. This + * allows for efficient expiry of old record versions, as entire segments can be dropped from the + * store at a time, once the records contained in the segment are no longer relevant based on the + * store's history retention (for an explanation of "history retention", see + * {@link VersionedKeyValueStore}). Multiple record versions (for the same key) within a single + * segment are stored together using the format specified in {@link RocksDBVersionedStoreSegmentValueFormatter}. + */ +public class RocksDBVersionedStore implements VersionedKeyValueStore { +private static final Logger LOG = LoggerFactory.getLogger(RocksDBVersionedStore.class); +// a marker to indicate that no record version has yet been found as part of an ongoing +// put() procedure. any value which is not a valid record timestamp will do. +private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE; + +private final String name; +private final long historyRetention; +private final RocksDBMetricsRecorder metricsRecorder; + +private final RocksDBStore latestValueStore; +private final LogicalKeyValueSegments segmentStores; +private final LatestValueSchema latestValueSchema; +private final SegmentValueSchema segmentValueSchema; +private final
[GitHub] [kafka] jsancio commented on a diff in pull request #13207: KAFKA-14664; Fix inaccurate raft idle ratio metric
jsancio commented on code in PR #13207: URL: https://github.com/apache/kafka/pull/13207#discussion_r1100891108 ## raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java: ## @@ -133,26 +131,27 @@ public KafkaRaftMetrics(Metrics metrics, String metricGrpPrefix, QuorumState sta "The average number of records appended per sec as the leader of the raft quorum."), new Rate(TimeUnit.SECONDS, new WindowedSum())); -this.pollIdleSensor = metrics.sensor("poll-idle-ratio"); -this.pollIdleSensor.add(metrics.metricName("poll-idle-ratio-avg", +this.pollDurationSensor = metrics.sensor("poll-idle-ratio"); +this.pollDurationSensor.add(metrics.metricName( +"poll-idle-ratio-avg", metricGroupName, -"The average fraction of time the client's poll() is idle as opposed to waiting for the user code to process records."), -new Avg()); +"The ratio of time the Raft IO thread is idle as opposed to " + +"doing work (e.g. handling requests or replicating from the leader)" +), +new TimeRatio(1.0) +); } public void updatePollStart(long currentTimeMs) { -if (pollEndMs.isPresent() && pollStartMs.isPresent()) { -long pollTimeMs = Math.max(pollEndMs.getAsLong() - pollStartMs.getAsLong(), 0L); -long totalTimeMs = Math.max(currentTimeMs - pollStartMs.getAsLong(), 1L); -this.pollIdleSensor.record(pollTimeMs / (double) totalTimeMs, currentTimeMs); -} - this.pollStartMs = OptionalLong.of(currentTimeMs); -this.pollEndMs = OptionalLong.empty(); } public void updatePollEnd(long currentTimeMs) { -this.pollEndMs = OptionalLong.of(currentTimeMs); +if (pollStartMs.isPresent()) { +long pollDurationMs = Math.max(currentTimeMs - pollStartMs.getAsLong(), 0L); Review Comment: Hmm. `KafkaRaftClient` uses `Time.milliseconds`. I think this is true if it used `Time.hiResClockMs`, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1100890258 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -0,0 +1,792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; +import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; +import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult; +import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A persistent, versioned key-value store based on RocksDB. + * + * This store implementation consists of a "latest value store" and "segment stores." The latest + * record version for each key is stored in the latest value store, while older record versions + * are stored in the segment stores. Conceptually, each record version has two associated + * timestamps: + * + * a {@code validFrom} timestamp. This timestamp is explicitly associated with the record + * as part of the {@link VersionedKeyValueStore#put(Object, Object, long)}} call to the store; + * i.e., this is the record's timestamp. + * a {@code validTo} timestamp. This is the timestamp of the next record (or deletion) + * associated with the same key, and is implicitly associated with the record. This timestamp + * can change as new records are inserted into the store. + * + * The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and + * can change as new record versions are inserted into the store (and validTo changes as a result). + * + * Old record versions are stored in segment stores according to their validTo timestamps. This + * allows for efficient expiry of old record versions, as entire segments can be dropped from the + * store at a time, once the records contained in the segment are no longer relevant based on the + * store's history retention (for an explanation of "history retention", see + * {@link VersionedKeyValueStore}). Multiple record versions (for the same key) within a single + * segment are stored together using the format specified in {@link RocksDBVersionedStoreSegmentValueFormatter}. + */ +public class RocksDBVersionedStore implements VersionedKeyValueStore { +private static final Logger LOG = LoggerFactory.getLogger(RocksDBVersionedStore.class); +// a marker to indicate that no record version has yet been found as part of an ongoing +// put() procedure. any value which is not a valid record timestamp will do. +private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE; + +private final String name; +private final long historyRetention; +private final RocksDBMetricsRecorder metricsRecorder; + +private final RocksDBStore latestValueStore; +private final LogicalKeyValueSegments segmentStores; +private final LatestValueSchema latestValueSchema; +private final SegmentValueSchema segmentValueSchema; +private final
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1100818551 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String, * * @param topicPartition topic partition * @param fetchState current partition fetch state. - * @param leaderEpochInRequest current leader epoch sent in the fetch request. - * @param leaderLogStartOffset log-start-offset in the leader replica. + * @param fetchPartitionData the fetch request data for this topic partition Review Comment: the returned value indicates whether the handler method returned with or without error. It's mentioned in the documentation description following convention of the other handler functions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1100818551 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String, * * @param topicPartition topic partition * @param fetchState current partition fetch state. - * @param leaderEpochInRequest current leader epoch sent in the fetch request. - * @param leaderLogStartOffset log-start-offset in the leader replica. + * @param fetchPartitionData the fetch request data for this topic partition Review Comment: the returned value indicates whether the handler method returned with or without error. It's mentioned in the documentation, but I'll update it to be more noticeable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1100818551 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String, * * @param topicPartition topic partition * @param fetchState current partition fetch state. - * @param leaderEpochInRequest current leader epoch sent in the fetch request. - * @param leaderLogStartOffset log-start-offset in the leader replica. + * @param fetchPartitionData the fetch request data for this topic partition Review Comment: the returned value indicates whether the handler method returned with or without error. I'll update the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1100802687 ## core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java: ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import kafka.cluster.Partition; +import kafka.log.LeaderOffsetIncremented$; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + The replica fetcher tier state machine follows a state machine progression. + + Currently, the tier state machine follows a synchronous execution, and we only need to start the machine. + There is no need to advance the state. + + When started, the tier state machine will fetch the local log start offset of the + leader and then build the follower's remote log aux state until the leader's + local log start offset. + */ +public class ReplicaFetcherTierStateMachine implements TierStateMachine { +private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class); Review Comment: i'll remove the static -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1100799705 ## core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java: ## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import kafka.cluster.Partition; +import kafka.log.LeaderOffsetIncremented$; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import kafka.server.checkpoints.LeaderEpochCheckpointFile; +import kafka.server.epoch.EpochEntry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; +import org.apache.kafka.common.protocol.Errors; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.immutable.Seq; + +/** + The replica fetcher tier state machine follows a state machine progression. + + Currently, the tier state machine follows a synchronous execution and only the start is needed. + There is no need to advance the state. + + When started, the tier state machine will fetch the local log start offset of the + leader and then build the follower's remote log aux state until the leader's + local log start offset. + */ +public class ReplicaFetcherTierStateMachine implements TierStateMachine { +private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class); + +private LeaderEndPoint leader; +private ReplicaManager replicaMgr; +private Integer fetchBackOffMs; Review Comment: sorry, thought I had it removed in my previous commits, but I guess not. I'll pick it up in the next commits -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14694) RPCProducerIdManager should not wait for a new block
Jeff Kim created KAFKA-14694: Summary: RPCProducerIdManager should not wait for a new block Key: KAFKA-14694 URL: https://issues.apache.org/jira/browse/KAFKA-14694 Project: Kafka Issue Type: Bug Reporter: Jeff Kim Assignee: Jeff Kim RPCProducerIdManager initiates an async request to the controller to grab a block of producer IDs and then blocks waiting for a response from the controller. This is done in the request handler threads while holding a global lock. This means that if many producers are requesting producer IDs and the controller is slow to respond, many threads can get stuck waiting for the lock. This may also be a deadlock concern under the following scenario: if the controller has 1 request handler thread (1 chosen for simplicity) and receives an InitProducerId request, it may deadlock. basically any time the controller has N InitProducerId requests where N >= # of request handler threads has the potential to deadlock. consider this: 1. the request handler thread tries to handle an InitProducerId request to the controller by forwarding an AllocateProducerIds request. 2. the request handler thread then waits on the controller response (timed poll on nextProducerIdBlock) 3. the controller's request handler threads need to pick this request up, and handle it, but the controller's request handler threads are blocked waiting for the forwarded AllocateProducerIds response. We should not block while waiting for a new block and instead return immediately to free the request handler threads. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1100722811 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1529,6 +1539,11 @@ private void resetToEmptyState() { */ private final ReplicationControlManager replicationControl; +/** + * Manages SCRAM credentials, if there are any. + */ +private final ScramControlManager scramControl; Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1100722578 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -53,7 +53,8 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{LogContext, Time, Utils} import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, TopicsDelta, TopicsImage} +import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1100722376 ## core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala: ## @@ -85,15 +113,17 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest { .setUpsertions(util.Arrays.asList(upsertion1, upsertion2))).build(), ) requests.foreach(request => { - val response = sendAlterUserScramCredentialsRequest(request) + val response = sendAlterUserScramCredentialsRequest(request, adminSocketServer) val results = response.data.results assertEquals(2, results.size) checkAllErrorsAlteringCredentials(results, Errors.DUPLICATE_RESOURCE, "when altering the same credential twice in a single request") }) } - @Test - def testAlterEmptyUser(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("kraft", "zk")) + def testAlterEmptyUser(quorum: String): Unit = { + println("Starting test") Review Comment: Fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
jolshan commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1100705771 ## clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java: ## @@ -212,24 +248,32 @@ public static ApiVersionCollection collectApis(Set apiKeys) { * @param listenerType the listener type which constrains the set of exposed APIs * @param minRecordVersion min inter broker magic * @param activeControllerApiVersions controller ApiVersions + * @param enableUnstableLastVersion whether unstable versions should be advertised or not Review Comment: Do we have a place where all this new unstable api usage is documented? It could be a bit confusing for folks who didn't take a look at this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1100690761 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() throws InterruptedExceptio Map translatedOffsets = backupClient.remoteConsumerOffsets( consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L)); return translatedOffsets.containsKey(remoteTopicPartition(tp1, PRIMARY_CLUSTER_ALIAS)) && - translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS)); + !translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS)); Review Comment: Hmm... if it's only a matter of reading to the end of the topic and then committing offsets, I think I'd prefer to have more coverage in this test. We do have assertions for regular checkpointing/syncing logic in other tests, but AFAICT we don't have anything to explicitly test the transition for a single consumer group from not being synced (even though other groups are being synced) to being synced. If we do have coverage for that somewhere else, then let me know and we can resolve this comment with no further action. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13659) MM2 should read all offset syncs at start up
[ https://issues.apache.org/jira/browse/KAFKA-13659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686112#comment-17686112 ] Chris Egerton commented on KAFKA-13659: --- [~dorwi] thanks for filing this issue. I've removed part of the description that refers to syncing consumer offsets on the downstream cluster with offsets beyond the end of the log, since that's already captured with KAFKA-12468 and a corresponding [PR for that issue|https://github.com/apache/kafka/pull/13178]. The remaining part of this ticket is still worth pursuing, and I'd be interested in seeing if we can merge a fix for it. Are you still interested in pursuing this ticket, or would it be okay to reassign to someone else? > MM2 should read all offset syncs at start up > > > Key: KAFKA-13659 > URL: https://issues.apache.org/jira/browse/KAFKA-13659 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Kanalas Vidor >Assignee: Kanalas Vidor >Priority: Major > > MirrorCheckpointTask uses OffsetSyncStore, and does not check whether > OffsetSyncStore managed to read to the "end" of the offset-syncs topic. > OffsetSyncStore should fetch the endoffset of the topic at startup, and set a > flag when it finally reaches the endoffset in consumption. > MirrorCheckpointTask.poll should wait for this flag to be true before doing > any in-memory updates and group offset management. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13659) MM2 should read all offset syncs at start up
[ https://issues.apache.org/jira/browse/KAFKA-13659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-13659: -- Summary: MM2 should read all offset syncs at start up (was: MM2 should read all offset syncs at start up and should not set consumer offset higher than the end offset) > MM2 should read all offset syncs at start up > > > Key: KAFKA-13659 > URL: https://issues.apache.org/jira/browse/KAFKA-13659 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Kanalas Vidor >Assignee: Kanalas Vidor >Priority: Major > > MirrorCheckpointTask uses OffsetSyncStore, and does not check whether > OffsetSyncStore managed to read to the "end" of the offset-syncs topic. > OffsetSyncStore should fetch the endoffset of the topic at startup, and set a > flag when it finally reaches the endoffset in consumption. > MirrorCheckpointTask.poll should wait for this flag to be true before doing > any in-memory updates and group offset management. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13659) MM2 should read all offset syncs at start up and should not set consumer offset higher than the end offset
[ https://issues.apache.org/jira/browse/KAFKA-13659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-13659: -- Description: MirrorCheckpointTask uses OffsetSyncStore, and does not check whether OffsetSyncStore managed to read to the "end" of the offset-syncs topic. OffsetSyncStore should fetch the endoffset of the topic at startup, and set a flag when it finally reaches the endoffset in consumption. MirrorCheckpointTask.poll should wait for this flag to be true before doing any in-memory updates and group offset management. (was: - MirrorCheckpointTask uses OffsetSyncStore, and does not check whether OffsetSyncStore managed to read to the "end" of the offset-syncs topic. OffsetSyncStore should fetch the endoffset of the topic at startup, and set a flag when it finally reaches the endoffset in consumption. MirrorCheckpointTask.poll should wait for this flag to be true before doing any in-memory updates and group offset management. - MirrorCheckpointTask can create checkpoints which point into the "future" - meaning it sometimes translates consumer offsets in a way that the target offset is greater than the endoffset of the replica topic partition. MirrorCheckpointTask should fetch the endoffsets of the affected topics, and make sure that it does not try to set the consumer offset to anything higher than the endoffset.) > MM2 should read all offset syncs at start up and should not set consumer > offset higher than the end offset > -- > > Key: KAFKA-13659 > URL: https://issues.apache.org/jira/browse/KAFKA-13659 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Kanalas Vidor >Assignee: Kanalas Vidor >Priority: Major > > MirrorCheckpointTask uses OffsetSyncStore, and does not check whether > OffsetSyncStore managed to read to the "end" of the offset-syncs topic. > OffsetSyncStore should fetch the endoffset of the topic at startup, and set a > flag when it finally reaches the endoffset in consumption. > MirrorCheckpointTask.poll should wait for this flag to be true before doing > any in-memory updates and group offset management. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag
gharris1727 commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1100680887 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception { produceMessages(primary, "test-topic-1"); -ReplicationPolicy replicationPolicy = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy(); -String remoteTopic = replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1"); - -// Check offsets are pushed to the checkpoint topic -Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( -"auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal"); -waitForCondition(() -> { -ConsumerRecords records = backupConsumer.poll(Duration.ofSeconds(1L)); -for (ConsumerRecord record : records) { -Checkpoint checkpoint = Checkpoint.deserializeRecord(record); -if (remoteTopic.equals(checkpoint.topicPartition().topic())) { -return true; -} -} -return false; -}, 30_000, -"Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + ".test-topic-1" -); Review Comment: I think that makes sense. I'll add some explicitly committed offsets here as a quick fix, and re-add the assertion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1100676199 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception { produceMessages(primary, "test-topic-1"); -ReplicationPolicy replicationPolicy = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy(); -String remoteTopic = replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1"); - -// Check offsets are pushed to the checkpoint topic -Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( -"auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal"); -waitForCondition(() -> { -ConsumerRecords records = backupConsumer.poll(Duration.ofSeconds(1L)); -for (ConsumerRecord record : records) { -Checkpoint checkpoint = Checkpoint.deserializeRecord(record); -if (remoteTopic.equals(checkpoint.topicPartition().topic())) { -return true; -} -} -return false; -}, 30_000, -"Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + ".test-topic-1" -); Review Comment: Thanks for the explanation, that clears things up nicely. > Upon further inspection, maybe this test makes sense to turn into a parameter for other tests, to verify that the functionality of the checkpoints when the syncs topic is moved around. Does that make sense to do in this PR? I'm hesitant to drop this coverage from the test, because it seems at least as useful to verify that offset sync records are published in the expected location (and translated to consumer offsets) as it is to verify that they aren't published in the incorrect location. My personal preference would be to add the minimal additional logic necessary to this test to keep that coverage, and then pursue parameterization of offset sync location out of band. I'm also not even sure that parameterization would be worth the tradeoff in build/test time, although we might be able to find a healthy compromise there. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag
gharris1727 commented on PR #13178: URL: https://github.com/apache/kafka/pull/13178#issuecomment-1423233290 > Oh, and the Jenkins build seems to be consistently failing on the IdentityReplicationIntegrationTest::testNoCheckpointsIfNoRecordsAreMirrored test case. Probably want to look into that before we merge It looks like this was failing due to a typo in my offset.flush.interval.ms overrides. This should be fixed now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1100667177 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -469,7 +461,7 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio // create a consumer at primary cluster to consume the new topic try (Consumer consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( -"group.id", "consumer-group-1"), "test-topic-2")) { Review Comment: Ah, now I see (was confusing the total end offsets with the total consumer group offsets). Thanks for the explanation! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1100654263 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -56,8 +56,7 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr // Offset is too far in the past to translate accurately return OptionalLong.of(-1L); } -long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset(); -return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep); +return OptionalLong.of(offsetSync.get().downstreamOffset() + (offsetSync.get().upstreamOffset() == upstreamOffset ? 0 : 1)); Review Comment: IMO the diagram is a bit overkill, but I tend not to be a very visual thinker. It certainly doesn't do any harm; I'm fine with leaving it in. Thanks for adding detail here, hopefully it'll come in handy if anyone needs to revisit this logic in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14674) Backport fix for KAFKA-14455 to 3.3 and 3.4 branches
[ https://issues.apache.org/jira/browse/KAFKA-14674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-14674. --- Resolution: Fixed Backported to [3.3|https://github.com/apache/kafka/commit/2aeceff25a4a6a0581adf779b77cd94838b4e224] and [3.4|https://github.com/apache/kafka/commit/e35ade9fb9d34f75b102ea2371db4d137866d464]. > Backport fix for KAFKA-14455 to 3.3 and 3.4 branches > > > Key: KAFKA-14674 > URL: https://issues.apache.org/jira/browse/KAFKA-14674 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > Fix For: 3.4.1, 3.3.3 > > > The 3.4 branch is currently frozen for the upcoming 3.4.0 release. Once the > release is complete, we can and should backport the change in > [https://github.com/apache/kafka/pull/12984] onto the 3.3 and 3.4 branches > before putting out any subsequent releases on those branches. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
C0urante commented on PR #12984: URL: https://github.com/apache/kafka/pull/12984#issuecomment-1423192295 Finished backporting to 3.3 and 3.4, now that both active releases on those branches have concluded. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag
gharris1727 commented on PR #13178: URL: https://github.com/apache/kafka/pull/13178#issuecomment-1423176443 > I notice that we don't do a read-to-end of the offset syncs topic in MirrorCheckpointTask before we begin syncing consumer group offsets, and we begin reading that topic from the beginning. This may cause us to sync offsets based on stale checkpoints if there are later checkpoints available in the topic that we haven't consumed yet. Do you think it might make sense to add a read-to-end for the offset syncs topic before we begin syncing consumer group offsets in the checkpoint connector? Ouch, yeah that is certainly an issue that gets worse with my change. Before the checkpoints would be non-monotonic for transactional/compacted topics, and after it's non-monotonic for everyone. I think addressing this in a follow-up is a smart idea, this change is already messy enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag
gharris1727 commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1100622077 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -56,8 +56,7 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr // Offset is too far in the past to translate accurately return OptionalLong.of(-1L); } -long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset(); -return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep); +return OptionalLong.of(offsetSync.get().downstreamOffset() + (offsetSync.get().upstreamOffset() == upstreamOffset ? 0 : 1)); Review Comment: Let me know if this description makes sense to you, and if the diagram adds any value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag
gharris1727 commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1100550240 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -700,21 +673,53 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName, int cnt = 0; for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++) for (int p = 0; p < numPartitions; p++) -cluster.kafka().produce(topicName, p, "key", "value-" + cnt++); +produce(cluster, topicName, p, "key", "value-" + cnt++); } - + +protected void produce(EmbeddedConnectCluster cluster, String topic, Integer partition, String key, String value) { +cluster.kafka().produce(topic, partition, key, value); +} + +protected static Map waitForAnyCheckpoint( Review Comment: It's `any` in contrast to the other method's `FullSync`. As in, it accepts checkpoints anywhere within a topic, not just at the end of the topic. I'll update the method name to be more precise and verbose :) ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java: ## @@ -233,6 +229,7 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio // have been automatically synchronized from primary to backup by the background job, so no // more records to consume from the replicated topic by the same consumer group at backup cluster assertEquals(0, records.count(), "consumer record size is not zero"); +backupConsumer.close(); Review Comment: I code-golfed this one, but there's no reason to. Updated. ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() throws InterruptedExceptio Map translatedOffsets = backupClient.remoteConsumerOffsets( consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L)); return translatedOffsets.containsKey(remoteTopicPartition(tp1, PRIMARY_CLUSTER_ALIAS)) && - translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS)); + !translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS)); Review Comment: 1. This was passing before, because `offset.max.lag` was nonzero, leaving a (0,0) offset sync in the topic. 2. This condition is part of `waitForAnyCheckpoint/waitForCheckpointOnAllPartitions` which is used in `testReplication`. I think that test should be sufficient to cover the normal case where offsets are translated. ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception { produceMessages(primary, "test-topic-1"); -ReplicationPolicy replicationPolicy = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy(); -String remoteTopic = replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1"); - -// Check offsets are pushed to the checkpoint topic -Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( -"auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal"); -waitForCondition(() -> { -ConsumerRecords records = backupConsumer.poll(Duration.ofSeconds(1L)); -for (ConsumerRecord record : records) { -Checkpoint checkpoint = Checkpoint.deserializeRecord(record); -if (remoteTopic.equals(checkpoint.topicPartition().topic())) { -return true; -} -} -return false; -}, 30_000, -"Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + ".test-topic-1" -); Review Comment: This assertion relied on a nonzero `offset.lag.max`, where the sync topic could be left with a (0,0) record. The offset being translated in this case is the `consumer-group-dummy` from the setup method, which has offset 0 (at the beginning of the topic). Now that `offset.lag.max` is zero and the starvation bug is fixed, the sync topic may only see nonzero syncs, which are not able to translate 0 offsets. I removed this assertion because it was failing and didn't seem to concern this test. Upon further inspection, maybe this test makes sense to turn into a parameter for other tests, to verify that the functionality of the checkpoints when the syncs topic is moved around. Does that make sense to do in this PR? ##
[GitHub] [kafka] C0urante commented on pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on PR #13137: URL: https://github.com/apache/kafka/pull/13137#issuecomment-1423159660 Thanks Mickael, I've addressed your suggestions and rebased. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
junrao commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1100515746 ## core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java: ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import kafka.cluster.Partition; +import kafka.log.LeaderOffsetIncremented$; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + The replica fetcher tier state machine follows a state machine progression. + + Currently, the tier state machine follows a synchronous execution, and we only need to start the machine. + There is no need to advance the state. + + When started, the tier state machine will fetch the local log start offset of the + leader and then build the follower's remote log aux state until the leader's + local log start offset. + */ +public class ReplicaFetcherTierStateMachine implements TierStateMachine { +private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class); Review Comment: This seems a bit weird. The log typically is an instance level object. ## core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java: ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import kafka.cluster.Partition; +import kafka.log.LeaderOffsetIncremented$; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer
divijvaidya commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r1100593799 ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -238,7 +238,7 @@ public KafkaClusterTestKit build() throws Exception { bootstrapMetadata); } catch (Throwable e) { log.error("Error creating controller {}", node.id(), e); -Utils.swallow(log, "sharedServer.stopForController", () -> sharedServer.stopForController()); +Utils.swallow(log, "sharedServer.stopForController", () -> sharedServer.stopForController(), null); Review Comment: ah! intended to do that. Have done it now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer
divijvaidya commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r1100593578 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -998,15 +998,18 @@ public static void closeAll(Closeable... closeables) throws IOException { throw exception; } -public static void swallow( -Logger log, -String what, -Runnable runnable -) { -try { -runnable.run(); -} catch (Throwable e) { -log.warn("{} error", what, e); +/** + * Run the supplied code. If an exception is thrown, it is swallowed and registered to the firstException parameter. + */ +public static void swallow(Logger log, String what, final Runnable code, final AtomicReference firstException) { +if (code != null) { +try { +code.run(); +} catch (Throwable t) { +log.warn("{} error", what, t); Review Comment: Fair point. Though not all "swallows" will require error level. So, I created a generic logging function based on `CoreUtils.swallow` and then used error for this specific instance of closing fetcher and coordinator. I have also updated the comment. Let me know if that look right? Happy to change it further. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer
divijvaidya commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r1100592340 ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -2455,7 +2454,7 @@ private void close(Duration timeout, boolean swallowException) { closeTimer.reset(remainingDurationInTimeout); // This is a blocking call bound by the time remaining in closeTimer -LambdaUtils.swallow(() -> fetcher.close(closeTimer), firstException); +Utils.swallow(log, " fetcher close", () -> fetcher.close(closeTimer), firstException); Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1100587361 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.rest.resources; + +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.NotFoundException; +import javax.ws.rs.Path; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.UriInfo; +import java.util.List; +import java.util.Map; + +@Path("/{source}/{target}/connectors") +public class InternalMirrorResource extends InternalClusterResource { + +@Context +private UriInfo uriInfo; + +private static final Logger log = LoggerFactory.getLogger(InternalMirrorResource.class); + +private final Map herders; + +public InternalMirrorResource(Map herders, RestClient restClient) { +super(restClient); +this.herders = herders; +} + +@Override +protected Herder herderForRequest() { +String source = pathParam("source"); +String target = pathParam("target"); +Herder result = herders.get(new SourceAndTarget(source, target)); +if (result == null) { +log.debug("Failed to find herder for source '{}' and target '{}'", source, target); Review Comment: I was under the impression that throwing this would only cause the exception message to be present in the REST response (and presumably the logs for the worker that issued the request), but after revisiting the `ConnectExceptionMapper` class, it appears that we'll also log the exception at debug level on this worker. So yep, we can remove this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1100585212 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.rest.resources; + +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.NotFoundException; +import javax.ws.rs.Path; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.UriInfo; +import java.util.List; +import java.util.Map; + +@Path("/{source}/{target}/connectors") +public class InternalMirrorResource extends InternalClusterResource { + +@Context +private UriInfo uriInfo; + +private static final Logger log = LoggerFactory.getLogger(InternalMirrorResource.class); + +private final Map herders; + +public InternalMirrorResource(Map herders, RestClient restClient) { +super(restClient); +this.herders = herders; +} + +@Override +protected Herder herderForRequest() { +String source = pathParam("source"); +String target = pathParam("target"); +Herder result = herders.get(new SourceAndTarget(source, target)); +if (result == null) { +log.debug("Failed to find herder for source '{}' and target '{}'", source, target); +throw new NotFoundException("No replication flow found for source '" + source + "' and target '" + target + "'"); +} +return result; +} + +private String pathParam(String name) { +List result = uriInfo.getPathParameters().get(name); +if (result == null || result.isEmpty()) +throw new NotFoundException(); Review Comment: Yeah, couldn't hurt. This is almost guaranteed to end up in a log file on another worker so in the unlikely event that this code path is somehow reached, more detail would be preferable to less. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1100580647 ## metadata/src/main/java/org/apache/kafka/image/ScramImage.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.image.writer.ImageWriter; +import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.clients.admin.ScramMechanism; + +// import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +// import java.util.function.Consumer; +import java.util.stream.Collectors; + + +/** + * Represents the SCRAM credentials in the metadata image. + * + * This class is thread-safe. + */ +public final class ScramImage { +public static final ScramImage EMPTY = new ScramImage(Collections.emptyMap()); + +private final Map> mechanisms; + +public ScramImage(Map> mechanisms) { +this.mechanisms = Collections.unmodifiableMap(mechanisms); +} + +public void write(ImageWriter writer, ImageWriterOptions options) { +for (Entry> mechanismEntry : mechanisms.entrySet()) { +for (Entry userEntry : mechanismEntry.getValue().entrySet()) { +writer.write(0, userEntry.getValue().toRecord(userEntry.getKey(), mechanismEntry.getKey())); +} +} +} + +public Map> mechanisms() { +return mechanisms; +} + +public boolean isEmpty() { +return mechanisms.isEmpty(); +} + +@Override +public int hashCode() { +return mechanisms.hashCode(); +} + +@Override +public boolean equals(Object o) { +if (o == null) return false; +if (!o.getClass().equals(ScramImage.class)) return false; +ScramImage other = (ScramImage) o; +return mechanisms.equals(other.mechanisms); +} + +@Override +public String toString() { +StringBuilder builder = new StringBuilder(); +builder.append("ScramImage("); +List sortedMechanisms = mechanisms.keySet().stream().sorted().collect(Collectors.toList()); +String preMechanismComma = ""; +for (ScramMechanism mechanism : sortedMechanisms) { +builder.append(preMechanismComma).append(mechanism).append(": {"); +Map userMap = mechanisms.get(mechanism); +List sortedUserNames = userMap.keySet().stream().sorted().collect(Collectors.toList()); +String preUserNameComma = ""; +for (String userName : sortedUserNames) { + builder.append(preUserNameComma).append(userName).append("=").append(userMap.get(userName)); Review Comment: As far as I can tell, the toString method is never used. I think it is useful for debugging, but I'd like to know how we handle this in general. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1100579983 ## metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.clients.admin.ScramMechanism; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.security.scram.ScramCredential; +import org.apache.kafka.common.security.scram.internals.ScramFormatter; +import org.apache.kafka.common.utils.Bytes; + +import java.security.GeneralSecurityException; +import java.util.Arrays; +import java.util.Objects; + + +/** + * Represents the ACLs in the metadata image. + * + * This class is thread-safe. + */ +public final class ScramCredentialData { +private final byte[] salt; +private final byte[] saltedPassword; +private final int iterations; + +static ScramCredentialData fromRecord( +UserScramCredentialRecord record +) { +return new ScramCredentialData( +record.salt(), +record.saltedPassword(), +record.iterations()); +} + +public ScramCredentialData( +byte[] salt, +byte[] saltedPassword, +int iterations +) { +this.salt = salt; +this.saltedPassword = saltedPassword; +this.iterations = iterations; +} + +public byte[] salt() { +return salt; +} + +public byte[] saltedPassword() { +return saltedPassword; +} + +public int iterations() { +return iterations; +} + +public UserScramCredentialRecord toRecord( +String userName, +ScramMechanism mechanism +) { +return new UserScramCredentialRecord(). +setName(userName). +setMechanism(mechanism.type()). +setSalt(salt). +setSaltedPassword(saltedPassword). +setIterations(iterations); +} + +public ScramCredential toCredential( +ScramMechanism mechanism +) throws GeneralSecurityException { +org.apache.kafka.common.security.scram.internals.ScramMechanism internalMechanism = + org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(mechanism.mechanismName()); +ScramFormatter formatter = new ScramFormatter(internalMechanism); +return new ScramCredential(salt, +formatter.storedKey(formatter.clientKey(saltedPassword)), +formatter.serverKey(saltedPassword), +iterations); +} + +@Override +public int hashCode() { +return Objects.hash(salt, saltedPassword, iterations); +} + +@Override +public boolean equals(Object o) { +if (o == null) return false; +if (!o.getClass().equals(ScramCredentialData.class)) return false; +ScramCredentialData other = (ScramCredentialData) o; +return Arrays.equals(salt, other.salt) && +Arrays.equals(saltedPassword, other.saltedPassword) && +iterations == other.iterations; +} + +@Override +public String toString() { +return "ScramCredentialData" + Review Comment: As far as I can tell, the toString method is never used. I think it is useful for debugging, but I'd like to know how we handle this in general. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1100576351 ## metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java: ## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.ScramMechanism; +import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData; +import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialDeletion; +import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialUpsertion; +import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData; +import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult; +import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +import static org.apache.kafka.common.protocol.Errors.DUPLICATE_RESOURCE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.RESOURCE_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.UNACCEPTABLE_CREDENTIAL; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_SASL_MECHANISM; + + +/** + * Manages SCRAM credentials. + */ +public class ScramControlManager { +static final int MAX_ITERATIONS = 16384; + +static class Builder { +private LogContext logContext = null; +private SnapshotRegistry snapshotRegistry = null; + +Builder setLogContext(LogContext logContext) { +this.logContext = logContext; +return this; +} + +Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) { +this.snapshotRegistry = snapshotRegistry; +return this; +} + +ScramControlManager build() { +if (logContext == null) logContext = new LogContext(); +if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); +return new ScramControlManager(logContext, +snapshotRegistry); +} +} + +static class ScramCredentialKey { +private final String username; +private final ScramMechanism mechanism; + +ScramCredentialKey(String username, ScramMechanism mechanism) { +this.username = username; +this.mechanism = mechanism; +} + +@Override +public int hashCode() { +return Objects.hash(username, mechanism); +} + +@Override +public boolean equals(Object o) { +if (o == null) return false; +if (!(o.getClass() == this.getClass())) return false; +ScramCredentialKey other = (ScramCredentialKey) o; +return username.equals(other.username) && +mechanism.equals(other.mechanism); +} + +@Override +public String toString() { +return "ScramCredentialKey" + +"(username=" + username + +", mechanism=" + mechanism + +")"; +} +} + +static class ScramCredentialValue { +private final byte[] salt; +private final byte[] saltedPassword; +private final int iterations; + +ScramCredentialValue( +byte[] salt, +byte[] saltedPassword, +int iterations +) { +this.salt = salt; +this.saltedPassword = saltedPassword; +this.iterations = iterations; +} + +@Override +
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1100573646 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -221,6 +223,21 @@ class BrokerMetadataPublisher( s"quotas in ${deltaName}", t) } + // Apply changes to SCRAM credentials. + Option(delta.scramDelta()).foreach { scramDelta => Review Comment: The credentials are added to the metadata log and then consumed by the brokers and added to the broker cache. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14693) KRaft Controller and ProcessExitingFaultHandler can deadlock shutdown
[ https://issues.apache.org/jira/browse/KAFKA-14693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-14693: --- Description: h1. Problem When the kraft controller encounters an error that it cannot handle it calls {{ProcessExitingFaultHandler}} which calls {{Exit.exit}} which calls {{{}Runtime.exit{}}}. Based on the Runtime.exit documentation: {quote}All registered [shutdown hooks|https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#addShutdownHook-java.lang.Thread-], if any, are started in some unspecified order and allowed to run concurrently until they finish. Once this is done the virtual machine [halts|https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#halt-int-]. {quote} One of the shutdown hooks registered by Kafka is {{{}Server.shutdown(){}}}. This shutdown hook eventually calls {{{}KafkaEventQueue.close{}}}. This last close method joins on the controller thread. Unfortunately, the controller thread also joined waiting for the shutdown hook thread to finish. Here are an sample thread stacks: {code:java} "QuorumControllerEventHandler" #45 prio=5 os_prio=0 cpu=429352.87ms elapsed=620807.49s allocated=38544M defined_classes=353 tid=0x7f5aeb31f800 nid=0x80c in Object.wait() [0x7f5a658fb000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(java.base@17.0.5/Native Method) - waiting on at java.lang.Thread.join(java.base@17.0.5/Thread.java:1304) - locked <0xa29241f8> (a org.apache.kafka.common.utils.KafkaThread) at java.lang.Thread.join(java.base@17.0.5/Thread.java:1372) at java.lang.ApplicationShutdownHooks.runHooks(java.base@17.0.5/ApplicationShutdownHooks.java:107) at java.lang.ApplicationShutdownHooks$1.run(java.base@17.0.5/ApplicationShutdownHooks.java:46) at java.lang.Shutdown.runHooks(java.base@17.0.5/Shutdown.java:130) at java.lang.Shutdown.exit(java.base@17.0.5/Shutdown.java:173) - locked <0xffe020b8> (a java.lang.Class for java.lang.Shutdown) at java.lang.Runtime.exit(java.base@17.0.5/Runtime.java:115) at java.lang.System.exit(java.base@17.0.5/System.java:1860) at org.apache.kafka.common.utils.Exit$2.execute(Exit.java:43) at org.apache.kafka.common.utils.Exit.exit(Exit.java:66) at org.apache.kafka.common.utils.Exit.exit(Exit.java:62) at org.apache.kafka.server.fault.ProcessExitingFaultHandler.handleFault(ProcessExitingFaultHandler.java:54) at org.apache.kafka.controller.QuorumController$ControllerWriteEvent$1.apply(QuorumController.java:891) at org.apache.kafka.controller.QuorumController$ControllerWriteEvent$1.apply(QuorumController.java:874) at org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:969){code} and {code:java} "kafka-shutdown-hook" #35 prio=5 os_prio=0 cpu=43.42ms elapsed=378593.04s allocated=4732K defined_classes=74 tid=0x7f5a7c09d800 nid=0x4f37 in Object.wait() [0x7f5a47afd000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(java.base@17.0.5/Native Method) - waiting on at java.lang.Thread.join(java.base@17.0.5/Thread.java:1304) - locked <0xa272bcb0> (a org.apache.kafka.common.utils.KafkaThread) at java.lang.Thread.join(java.base@17.0.5/Thread.java:1372) at org.apache.kafka.queue.KafkaEventQueue.close(KafkaEventQueue.java:509) at org.apache.kafka.controller.QuorumController.close(QuorumController.java:2553) at kafka.server.ControllerServer.shutdown(ControllerServer.scala:521) at kafka.server.KafkaRaftServer.shutdown(KafkaRaftServer.scala:184) at kafka.Kafka$.$anonfun$main$3(Kafka.scala:99) at kafka.Kafka$$$Lambda$406/0x000800fb9730.apply$mcV$sp(Unknown Source) at kafka.utils.Exit$.$anonfun$addShutdownHook$1(Exit.scala:38) at kafka.Kafka$$$Lambda$407/0x000800fb9a10.run(Unknown Source) at java.lang.Thread.run(java.base@17.0.5/Thread.java:833) at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:64) {code} h1. Possible Solution A possible solution is to have the controller's unhandled fault handler call {{Runtime.halt}} instead of {{{}Runtime.exit{}}}. was: h1. Problem When the kraft controller encounters an error that it cannot handle it calls `ProcessExitingFaultHandler` which calls `Exit.exit` which calls `Runtime.exit`. Based on the Runtime.exit documentation: > All registered [shutdown
[jira] [Created] (KAFKA-14693) KRaft Controller and ProcessExitingFaultHandler can deadlock shutdown
José Armando García Sancio created KAFKA-14693: -- Summary: KRaft Controller and ProcessExitingFaultHandler can deadlock shutdown Key: KAFKA-14693 URL: https://issues.apache.org/jira/browse/KAFKA-14693 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 3.4.0 Reporter: José Armando García Sancio Fix For: 3.4.1 h1. Problem When the kraft controller encounters an error that it cannot handle it calls `ProcessExitingFaultHandler` which calls `Exit.exit` which calls `Runtime.exit`. Based on the Runtime.exit documentation: > All registered [shutdown >hooks|https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#addShutdownHook-java.lang.Thread-], > if any, are started in some unspecified order and allowed to run concurrently >until they finish. Once this is done the virtual machine >[halts|https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#halt-int-]. One of the shutdown hooks registered by Kafka is `Server.shutdown()`. This shutdown hook eventually calls `KafkaEventQueue.close`. This last close method joins on the controller thread. Unfortunately, the controller thread also joined waiting for the shutdown hook thread to finish. Here are an sample thread stacks: {code:java} "QuorumControllerEventHandler" #45 prio=5 os_prio=0 cpu=429352.87ms elapsed=620807.49s allocated=38544M defined_classes=353 tid=0x7f5aeb31f800 nid=0x80c in Object.wait() [0x7f5a658fb000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(java.base@17.0.5/Native Method) - waiting on at java.lang.Thread.join(java.base@17.0.5/Thread.java:1304) - locked <0xa29241f8> (a org.apache.kafka.common.utils.KafkaThread) at java.lang.Thread.join(java.base@17.0.5/Thread.java:1372) at java.lang.ApplicationShutdownHooks.runHooks(java.base@17.0.5/ApplicationShutdownHooks.java:107) at java.lang.ApplicationShutdownHooks$1.run(java.base@17.0.5/ApplicationShutdownHooks.java:46) at java.lang.Shutdown.runHooks(java.base@17.0.5/Shutdown.java:130) at java.lang.Shutdown.exit(java.base@17.0.5/Shutdown.java:173) - locked <0xffe020b8> (a java.lang.Class for java.lang.Shutdown) at java.lang.Runtime.exit(java.base@17.0.5/Runtime.java:115) at java.lang.System.exit(java.base@17.0.5/System.java:1860) at org.apache.kafka.common.utils.Exit$2.execute(Exit.java:43) at org.apache.kafka.common.utils.Exit.exit(Exit.java:66) at org.apache.kafka.common.utils.Exit.exit(Exit.java:62) at org.apache.kafka.server.fault.ProcessExitingFaultHandler.handleFault(ProcessExitingFaultHandler.java:54) at org.apache.kafka.controller.QuorumController$ControllerWriteEvent$1.apply(QuorumController.java:891) at org.apache.kafka.controller.QuorumController$ControllerWriteEvent$1.apply(QuorumController.java:874) at org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:969){code} and {code:java} "kafka-shutdown-hook" #35 prio=5 os_prio=0 cpu=43.42ms elapsed=378593.04s allocated=4732K defined_classes=74 tid=0x7f5a7c09d800 nid=0x4f37 in Object.wait() [0x7f5a47afd000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(java.base@17.0.5/Native Method) - waiting on at java.lang.Thread.join(java.base@17.0.5/Thread.java:1304) - locked <0xa272bcb0> (a org.apache.kafka.common.utils.KafkaThread) at java.lang.Thread.join(java.base@17.0.5/Thread.java:1372) at org.apache.kafka.queue.KafkaEventQueue.close(KafkaEventQueue.java:509) at org.apache.kafka.controller.QuorumController.close(QuorumController.java:2553) at kafka.server.ControllerServer.shutdown(ControllerServer.scala:521) at kafka.server.KafkaRaftServer.shutdown(KafkaRaftServer.scala:184) at kafka.Kafka$.$anonfun$main$3(Kafka.scala:99) at kafka.Kafka$$$Lambda$406/0x000800fb9730.apply$mcV$sp(Unknown Source) at kafka.utils.Exit$.$anonfun$addShutdownHook$1(Exit.scala:38) at kafka.Kafka$$$Lambda$407/0x000800fb9a10.run(Unknown Source) at java.lang.Thread.run(java.base@17.0.5/Thread.java:833) at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:64) {code} h1. Possible Solution A possible solution is to have the controller's unhandle fault handler call `Runtime.halt` instead of `Runtime.exit`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1100539790 ## streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; + +/** + * A key-value store that stores multiple record versions per key, and supports timestamp-based + * retrieval operations to return the latest record (per key) as of a specified timestamp. + * Only one record is stored per key and timestamp, i.e., a second call to + * {@link #put(Object, Object, long)} with the same key and timestamp will replace the first. + * + * Each store instance has an associated, fixed-duration "history retention" which specifies + * how long old record versions should be kept for. In particular, a versioned store guarantees + * to return accurate results for calls to {@link #get(Object, long)} where the provided timestamp + * bound is within history retention of the current observed stream time. (Queries with timestamp + * bound older than the specified history retention are considered invalid.) + * + * @param The key type + * @param The value type + */ +public interface VersionedKeyValueStore extends StateStore { + +/** + * Add a new record version associated with this key. + * + * @param key The key + * @param value The value, it can be {@code null}; + * if the serialized bytes are also {@code null} it is interpreted as a delete + * @param timestamp The timestamp for this record version + * @throws NullPointerException If {@code null} is used for key. + */ +void put(K key, V value, long timestamp); + +/** + * Delete the value associated with this key from the store, at the specified timestamp + * (if there is such a value), and return the deleted value. + * + * This operation is semantically equivalent to {@link #get(Object, long)} #get(key, timestamp))} + * followed by {@link #put(Object, Object, long) #put(key, null, timestamp)}. + * + * @param key The key + * @param timestamp The timestamp for this delete + * @return The value and timestamp of the latest record associated with this key + * as of the deletion timestamp (inclusive), or {@code null} if any of Review Comment: Totally -- JavaDocs are not public API. it's well, _docs_; if we have them in the KIP, it's just a template/placeholder. Not need to go back to the KIP (otherwise we could never change JavaDocs without altering older KIPs ) ## streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; + +/** + * A key-value store that stores multiple record versions per key, and supports timestamp-based + * retrieval operations to return the latest record (per key) as of a specified timestamp. + * Only one record is stored per key and timestamp, i.e., a second call to + * {@link #put(Object, Object, long)} with the same key and timestamp will replace the
[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module
Hangleton commented on code in PR #13214: URL: https://github.com/apache/kafka/pull/13214#discussion_r1100547585 ## tools/src/main/java/org/apache/kafka/tools/MessageReader.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.io.InputStream; +import java.util.Properties; + +/** + * Typical implementations of this interface convert data from an {@link InputStream} received via + * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each + * invocation of `{@link MessageReader#readMessage()}`. + * + * This is used by the {@link ConsoleProducer}. + */ +public interface MessageReader { Review Comment: Note that this class is only used by the `ConsoleProducer` and has one implementation. If no further used is intended, it can be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs
[ https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686048#comment-17686048 ] ASF GitHub Bot commented on KAFKA-14692: mumrah merged PR #487: URL: https://github.com/apache/kafka-site/pull/487 > Issues in Zookeeper to KRaft migration docs > --- > > Key: KAFKA-14692 > URL: https://issues.apache.org/jira/browse/KAFKA-14692 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Mickael Maison >Assignee: David Arthur >Priority: Major > > Following [https://kafka.apache.org/documentation/#kraft_zk_migration] > 1) It completely skips the facts that the storage for the new quorum should > be formatted using the existing cluster id. > 2) In Provisioning the KRaft controller quorum: > {{controller.quorum.voters=1@localhost:9093}} should be > {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000 > 3) When migrating brokers, it states: > {code:java} > # Don't set the IBP, KRaft uses "metadata.version" feature flag > # inter.broker.protocol.version=3.4 > # Keep the migration enabled > zookeeper.metadata.migration.enable=true > # Remove ZooKeeper client configuration > # zookeeper.connect=localhost:2181 > {code} > However if I do that, my brokers fails to restart and print: > {code:java} > org.apache.kafka.common.config.ConfigException: If using > zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must > also be set. > {code} > 4) When disabling zookeeper.metadata.migration.enable or keeping > zookeeper.connect to get past this step, when brokers restart they print a > lot of error messages: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-08 12:06:25,776] ERROR Exception while processing request from > 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs
[ https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686045#comment-17686045 ] ASF GitHub Bot commented on KAFKA-14692: mumrah commented on PR #487: URL: https://github.com/apache/kafka-site/pull/487#issuecomment-1423068675 > Can you also open the same PR against kafka:3.4? Yes, I'll work on that later this week. > Issues in Zookeeper to KRaft migration docs > --- > > Key: KAFKA-14692 > URL: https://issues.apache.org/jira/browse/KAFKA-14692 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Mickael Maison >Assignee: David Arthur >Priority: Major > > Following [https://kafka.apache.org/documentation/#kraft_zk_migration] > 1) It completely skips the facts that the storage for the new quorum should > be formatted using the existing cluster id. > 2) In Provisioning the KRaft controller quorum: > {{controller.quorum.voters=1@localhost:9093}} should be > {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000 > 3) When migrating brokers, it states: > {code:java} > # Don't set the IBP, KRaft uses "metadata.version" feature flag > # inter.broker.protocol.version=3.4 > # Keep the migration enabled > zookeeper.metadata.migration.enable=true > # Remove ZooKeeper client configuration > # zookeeper.connect=localhost:2181 > {code} > However if I do that, my brokers fails to restart and print: > {code:java} > org.apache.kafka.common.config.ConfigException: If using > zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must > also be set. > {code} > 4) When disabling zookeeper.metadata.migration.enable or keeping > zookeeper.connect to get past this step, when brokers restart they print a > lot of error messages: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-08 12:06:25,776] ERROR Exception while processing request from > 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1098117328 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -0,0 +1,792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; +import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; +import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult; +import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A persistent, versioned key-value store based on RocksDB. + * + * This store implementation consists of a "latest value store" and "segment stores." The latest + * record version for each key is stored in the latest value store, while older record versions + * are stored in the segment stores. Conceptually, each record version has two associated + * timestamps: + * + * a {@code validFrom} timestamp. This timestamp is explicitly associated with the record + * as part of the {@link VersionedKeyValueStore#put(Object, Object, long)}} call to the store; + * i.e., this is the record's timestamp. + * a {@code validTo} timestamp. This is the timestamp of the next record (or deletion) + * associated with the same key, and is implicitly associated with the record. This timestamp + * can change as new records are inserted into the store. + * + * The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and + * can change as new record versions are inserted into the store (and validTo changes as a result). + * + * Old record versions are stored in segment stores according to their validTo timestamps. This + * allows for efficient expiry of old record versions, as entire segments can be dropped from the + * store at a time, once the records contained in the segment are no longer relevant based on the + * store's history retention (for an explanation of "history retention", see + * {@link VersionedKeyValueStore}). Multiple record versions (for the same key) within a single + * segment are stored together using the format specified in {@link RocksDBVersionedStoreSegmentValueFormatter}. + */ +public class RocksDBVersionedStore implements VersionedKeyValueStore { +private static final Logger LOG = LoggerFactory.getLogger(RocksDBVersionedStore.class); +// a marker to indicate that no record version has yet been found as part of an ongoing +// put() procedure. any value which is not a valid record timestamp will do. +private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE; + +private final String name; +private final long historyRetention; +private final RocksDBMetricsRecorder metricsRecorder; + +private final RocksDBStore latestValueStore; +private final LogicalKeyValueSegments segmentStores; +private final LatestValueSchema latestValueSchema; +private final SegmentValueSchema segmentValueSchema; +private final
[GitHub] [kafka] ahuang98 opened a new pull request, #13221: MINOR: Fix hyperlink tags in upgrade docs
ahuang98 opened a new pull request, #13221: URL: https://github.com/apache/kafka/pull/13221 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs
[ https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686042#comment-17686042 ] ASF GitHub Bot commented on KAFKA-14692: mumrah commented on code in PR #487: URL: https://github.com/apache/kafka-site/pull/487#discussion_r1100528500 ## 34/ops.html: ## @@ -3615,8 +3620,21 @@ Preparing for migration Provisioning the KRaft controller quorum Two things are needed before the migration can begin. The brokers must be configured to support the migration and -a KRaft controller quorum must be deployed. For the KRaft deployment, please refer to the above documentation. -The KRaft controllers should be provisioned with the latest metadata.version of "3.4". +a KRaft controller quorum must be deployed. The KRaft controllers should be provisioned with the same cluster ID as +the existing Kafka cluster. This can be found by examining one of the "meta.properties" files in the data directories +of the brokers, or by running the following command. + + + +./bin/zookeeper-shell.sh localhost:2181 get /cluster/id Review Comment: Good call. I'll adjust the other pre-formatted sections as well. > Issues in Zookeeper to KRaft migration docs > --- > > Key: KAFKA-14692 > URL: https://issues.apache.org/jira/browse/KAFKA-14692 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Mickael Maison >Assignee: David Arthur >Priority: Major > > Following [https://kafka.apache.org/documentation/#kraft_zk_migration] > 1) It completely skips the facts that the storage for the new quorum should > be formatted using the existing cluster id. > 2) In Provisioning the KRaft controller quorum: > {{controller.quorum.voters=1@localhost:9093}} should be > {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000 > 3) When migrating brokers, it states: > {code:java} > # Don't set the IBP, KRaft uses "metadata.version" feature flag > # inter.broker.protocol.version=3.4 > # Keep the migration enabled > zookeeper.metadata.migration.enable=true > # Remove ZooKeeper client configuration > # zookeeper.connect=localhost:2181 > {code} > However if I do that, my brokers fails to restart and print: > {code:java} > org.apache.kafka.common.config.ConfigException: If using > zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must > also be set. > {code} > 4) When disabling zookeeper.metadata.migration.enable or keeping > zookeeper.connect to get past this step, when brokers restart they print a > lot of error messages: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-08 12:06:25,776] ERROR Exception while processing request from > 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs
[ https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686041#comment-17686041 ] ASF GitHub Bot commented on KAFKA-14692: mimaison commented on code in PR #487: URL: https://github.com/apache/kafka-site/pull/487#discussion_r1100522020 ## 34/ops.html: ## @@ -3615,8 +3620,21 @@ Preparing for migration Provisioning the KRaft controller quorum Two things are needed before the migration can begin. The brokers must be configured to support the migration and -a KRaft controller quorum must be deployed. For the KRaft deployment, please refer to the above documentation. -The KRaft controllers should be provisioned with the latest metadata.version of "3.4". +a KRaft controller quorum must be deployed. The KRaft controllers should be provisioned with the same cluster ID as +the existing Kafka cluster. This can be found by examining one of the "meta.properties" files in the data directories +of the brokers, or by running the following command. + + + +./bin/zookeeper-shell.sh localhost:2181 get /cluster/id Review Comment: Text within `` blocks keep the padding and the final line ending. I find the docs much more readable when we remove the spaces and avoid having a new line at the end. ``` ./bin/zookeeper-shell.sh localhost:2181 get /cluster/id ``` This applies to all other formatted blocks in this section. > Issues in Zookeeper to KRaft migration docs > --- > > Key: KAFKA-14692 > URL: https://issues.apache.org/jira/browse/KAFKA-14692 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Mickael Maison >Assignee: David Arthur >Priority: Major > > Following [https://kafka.apache.org/documentation/#kraft_zk_migration] > 1) It completely skips the facts that the storage for the new quorum should > be formatted using the existing cluster id. > 2) In Provisioning the KRaft controller quorum: > {{controller.quorum.voters=1@localhost:9093}} should be > {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000 > 3) When migrating brokers, it states: > {code:java} > # Don't set the IBP, KRaft uses "metadata.version" feature flag > # inter.broker.protocol.version=3.4 > # Keep the migration enabled > zookeeper.metadata.migration.enable=true > # Remove ZooKeeper client configuration > # zookeeper.connect=localhost:2181 > {code} > However if I do that, my brokers fails to restart and print: > {code:java} > org.apache.kafka.common.config.ConfigException: If using > zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must > also be set. > {code} > 4) When disabling zookeeper.metadata.migration.enable or keeping > zookeeper.connect to get past this step, when brokers restart they print a > lot of error messages: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-08 12:06:25,776] ERROR Exception while processing request from > 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs
[ https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686029#comment-17686029 ] Mickael Maison commented on KAFKA-14692: In my test cluster the errors persist. After the controller has printed "Completed migration of metadata from Zookeeper to KRaft", it started printing "TRACE Sending RPCs to brokers for metadata delta." every 500ms. Then I updated the config from one of the brokers: - changed broker.id to node.id - added process.roles=broker - removed all zookeeper configs Restarted that broker and it spams: {code:java} [2023-02-08 19:06:20,410] ERROR Exception while processing request from 192.168.1.11:9092-192.168.1.11:65127-216 (kafka.network.Processor) org.apache.kafka.common.errors.InvalidRequestException: Received request api key LEADER_AND_ISR which is not enabled {code} On the controller side, there are few disconnection messages as broker 0 is restarted {code:java} [2023-02-08 19:05:00,454] INFO [Controller id=1000, targetBrokerId=0] Node 0 disconnected. (org.apache.kafka.clients.NetworkClient) [2023-02-08 19:05:00,454] WARN [Controller id=1000, targetBrokerId=0] Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2023-02-08 19:05:00,473] INFO [Controller id=1000, targetBrokerId=0] Client requested connection close from node 0 (org.apache.kafka.clients.NetworkClient) {code} and then it starts spamming {code:java} [2023-02-08 19:05:09,204] INFO [Controller id=1000, targetBrokerId=0] Cancelled in-flight LEADER_AND_ISR request with correlation id 10 due to node 0 being disconnected (elapsed time since creation: 1535ms, elapsed time since send: 1535ms, request timeout: 3ms) (org.apache.kafka.clients.NetworkClient) [2023-02-08 19:05:09,241] INFO [Controller id=1000, targetBrokerId=0] Client requested connection close from node 0 (org.apache.kafka.clients.NetworkClient) [2023-02-08 19:05:09,381] INFO [Controller id=1000, targetBrokerId=0] Node 0 disconnected. (org.apache.kafka.clients.NetworkClient) {code} > Issues in Zookeeper to KRaft migration docs > --- > > Key: KAFKA-14692 > URL: https://issues.apache.org/jira/browse/KAFKA-14692 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Mickael Maison >Assignee: David Arthur >Priority: Major > > Following [https://kafka.apache.org/documentation/#kraft_zk_migration] > 1) It completely skips the facts that the storage for the new quorum should > be formatted using the existing cluster id. > 2) In Provisioning the KRaft controller quorum: > {{controller.quorum.voters=1@localhost:9093}} should be > {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000 > 3) When migrating brokers, it states: > {code:java} > # Don't set the IBP, KRaft uses "metadata.version" feature flag > # inter.broker.protocol.version=3.4 > # Keep the migration enabled > zookeeper.metadata.migration.enable=true > # Remove ZooKeeper client configuration > # zookeeper.connect=localhost:2181 > {code} > However if I do that, my brokers fails to restart and print: > {code:java} > org.apache.kafka.common.config.ConfigException: If using > zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must > also be set. > {code} > 4) When disabling zookeeper.metadata.migration.enable or keeping > zookeeper.connect to get past this step, when brokers restart they print a > lot of error messages: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-08 12:06:25,776] ERROR Exception while processing request from > 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-9803) Allow producers to recover gracefully from transaction timeouts
[ https://issues.apache.org/jira/browse/KAFKA-9803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686015#comment-17686015 ] Travis Bischel commented on KAFKA-9803: --- Would it be possible to move this forward? It looks like timed out transactions are still unrecoverable as of 3.4, due to the TRANSACTION_TIMED_OUT error return from EndTransactionRequest not yet being implemented. > Allow producers to recover gracefully from transaction timeouts > --- > > Key: KAFKA-9803 > URL: https://issues.apache.org/jira/browse/KAFKA-9803 > Project: Kafka > Issue Type: Improvement > Components: producer , streams >Reporter: Jason Gustafson >Assignee: Boyang Chen >Priority: Major > Labels: needs-kip > > Transaction timeouts are detected by the transaction coordinator. When the > coordinator detects a timeout, it bumps the producer epoch and aborts the > transaction. The epoch bump is necessary in order to prevent the current > producer from being able to begin writing to a new transaction which was not > started through the coordinator. > Transactions may also be aborted if a new producer with the same > `transactional.id` starts up. Similarly this results in an epoch bump. > Currently the coordinator does not distinguish these two cases. Both will end > up as a `ProducerFencedException`, which means the producer needs to shut > itself down. > We can improve this with the new APIs from KIP-360. When the coordinator > times out a transaction, it can remember that fact and allow the existing > producer to claim the bumped epoch and continue. Roughly the logic would work > like this: > 1. When a transaction times out, set lastProducerEpoch to the current epoch > and do the normal bump. > 2. Any transactional requests from the old epoch result in a new > TRANSACTION_TIMED_OUT error code, which is propagated to the application. > 3. The producer recovers by sending InitProducerId with the current epoch. > The coordinator returns the bumped epoch. > One issue that needs to be addressed is how to handle INVALID_PRODUCER_EPOCH > from Produce requests. Partition leaders will not generally know if a bumped > epoch was the result of a timed out transaction or a fenced producer. > Possibly the producer can treat these errors as abortable when they come from > Produce responses. In that case, the user would try to abort the transaction > and then we can see if it was due to a timeout or otherwise. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Hangleton commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
Hangleton commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1100057051 ## core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java: ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import kafka.cluster.Partition; +import kafka.log.LeaderOffsetIncremented$; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + The replica fetcher tier state machine follows a state machine progression. + + Currently, the tier state machine follows a synchronous execution, and we only need to start the machine. + There is no need to advance the state. + + When started, the tier state machine will fetch the local log start offset of the + leader and then build the follower's remote log aux state until the leader's + local log start offset. + */ +public class ReplicaFetcherTierStateMachine implements TierStateMachine { +private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class); + +private LeaderEndPoint leader; +private ReplicaManager replicaMgr; +private Integer fetchBackOffMs; + +public ReplicaFetcherTierStateMachine(LeaderEndPoint leader, + ReplicaManager replicaMgr) { +this.leader = leader; +this.replicaMgr = replicaMgr; +} + + +/** + * Start the tier state machine for the provided topic partition. Currently, this start method will build the + * entire remote aux log state synchronously. + * + * @param topicPartition the topic partition + * @param currentFetchState the current PartitionFetchState which will + * be used to derive the return value + * @param fetchPartitionData the data from the fetch response that returned the offset moved to tiered storage error + * + * @return the new PartitionFetchState after the successful start of the + * tier state machine + */ +public PartitionFetchState start(TopicPartition topicPartition, + PartitionFetchState currentFetchState, + PartitionData fetchPartitionData) throws Exception { + +Tuple2 epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +int epoch = (int) epochAndLeaderLocalStartOffset._1; +long leaderLocalStartOffset = (long) epochAndLeaderLocalStartOffset._2; + +long offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset); + +Tuple2 fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +long
[GitHub] [kafka] dajac commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
dajac commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1100495875 ## clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java: ## @@ -212,24 +248,32 @@ public static ApiVersionCollection collectApis(Set apiKeys) { * @param listenerType the listener type which constrains the set of exposed APIs * @param minRecordVersion min inter broker magic * @param activeControllerApiVersions controller ApiVersions + * @param enableUnstableLastVersion whether unstable versions should be advertised or not Review Comment: that's right but only the last version of an api could be unstable. this is where the "last" comes from. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs
[ https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur reassigned KAFKA-14692: Assignee: David Arthur > Issues in Zookeeper to KRaft migration docs > --- > > Key: KAFKA-14692 > URL: https://issues.apache.org/jira/browse/KAFKA-14692 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Mickael Maison >Assignee: David Arthur >Priority: Major > > Following [https://kafka.apache.org/documentation/#kraft_zk_migration] > 1) It completely skips the facts that the storage for the new quorum should > be formatted using the existing cluster id. > 2) In Provisioning the KRaft controller quorum: > {{controller.quorum.voters=1@localhost:9093}} should be > {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000 > 3) When migrating brokers, it states: > {code:java} > # Don't set the IBP, KRaft uses "metadata.version" feature flag > # inter.broker.protocol.version=3.4 > # Keep the migration enabled > zookeeper.metadata.migration.enable=true > # Remove ZooKeeper client configuration > # zookeeper.connect=localhost:2181 > {code} > However if I do that, my brokers fails to restart and print: > {code:java} > org.apache.kafka.common.config.ConfigException: If using > zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must > also be set. > {code} > 4) When disabling zookeeper.metadata.migration.enable or keeping > zookeeper.connect to get past this step, when brokers restart they print a > lot of error messages: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-08 12:06:25,776] ERROR Exception while processing request from > 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs
[ https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686012#comment-17686012 ] David Arthur commented on KAFKA-14692: -- Thanks [~mimaison]! Here's a PR with the fixes https://github.com/apache/kafka-site/pull/487 For 4, you should remove the migration flag and the ZK configs. The "Received request api key LEADER_AND_ISR which is not enabled" error means the controller thinks this broker is still in ZK mode. This could happen if there was an in-flight LISR for the previous broker incarnation before the controller got the new registration. Does that error persist, or only happen during startup? > Issues in Zookeeper to KRaft migration docs > --- > > Key: KAFKA-14692 > URL: https://issues.apache.org/jira/browse/KAFKA-14692 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Mickael Maison >Assignee: David Arthur >Priority: Major > > Following [https://kafka.apache.org/documentation/#kraft_zk_migration] > 1) It completely skips the facts that the storage for the new quorum should > be formatted using the existing cluster id. > 2) In Provisioning the KRaft controller quorum: > {{controller.quorum.voters=1@localhost:9093}} should be > {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000 > 3) When migrating brokers, it states: > {code:java} > # Don't set the IBP, KRaft uses "metadata.version" feature flag > # inter.broker.protocol.version=3.4 > # Keep the migration enabled > zookeeper.metadata.migration.enable=true > # Remove ZooKeeper client configuration > # zookeeper.connect=localhost:2181 > {code} > However if I do that, my brokers fails to restart and print: > {code:java} > org.apache.kafka.common.config.ConfigException: If using > zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must > also be set. > {code} > 4) When disabling zookeeper.metadata.migration.enable or keeping > zookeeper.connect to get past this step, when brokers restart they print a > lot of error messages: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-08 12:06:25,776] ERROR Exception while processing request from > 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on a diff in pull request #13203: KAFKA-14048: Add new `__consumer_offsets` records from KIP-848
mimaison commented on code in PR #13203: URL: https://github.com/apache/kafka/pull/13203#discussion_r1100494370 ## group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentKey.json: ## @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// KIP-848 is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "ConsumerGroupCurrentMemberAssignmentKey", + "validVersions": "8", Review Comment: I see, thanks for the explaination -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1100491548 ## metadata/src/main/java/org/apache/kafka/image/ScramImage.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.image.writer.ImageWriter; +import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.clients.admin.ScramMechanism; + +// import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +// import java.util.function.Consumer; +import java.util.stream.Collectors; + + +/** + * Represents the SCRAM credentials in the metadata image. + * + * This class is thread-safe. + */ +public final class ScramImage { +public static final ScramImage EMPTY = new ScramImage(Collections.emptyMap()); + +private final Map> mechanisms; + +public ScramImage(Map> mechanisms) { +this.mechanisms = Collections.unmodifiableMap(mechanisms); +} + +public void write(ImageWriter writer, ImageWriterOptions options) { Review Comment: This is the same as in `AclsImage.java` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
jeffkbkim commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1100490932 ## clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java: ## @@ -212,24 +248,32 @@ public static ApiVersionCollection collectApis(Set apiKeys) { * @param listenerType the listener type which constrains the set of exposed APIs * @param minRecordVersion min inter broker magic * @param activeControllerApiVersions controller ApiVersions + * @param enableUnstableLastVersion whether unstable versions should be advertised or not Review Comment: i'm a bit confused on the word "last". does this mean we enable only if the last version is unstable? my understanding is we enable all unstable versions or we don't -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
mimaison commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1100386611 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java: ## @@ -80,18 +81,25 @@ public class MirrorMakerConfig extends AbstractConfig { static final String TARGET_CLUSTER_PREFIX = "target.cluster."; static final String SOURCE_PREFIX = "source."; static final String TARGET_PREFIX = "target."; +static final String ENABLE_INTERNAL_REST_CONFIG = "dedicated.mode.enable.internal.rest"; +private static final String ENABLE_INTERNAL_REST_DOC = +"Whether to bring up an internal-only REST server that allows multi-node clusters to operate correctly"; Review Comment: Nit: We usually have a period at the end of descriptions. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.rest.resources; + +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.NotFoundException; +import javax.ws.rs.Path; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.UriInfo; +import java.util.List; +import java.util.Map; + +@Path("/{source}/{target}/connectors") +public class InternalMirrorResource extends InternalClusterResource { + +@Context +private UriInfo uriInfo; + +private static final Logger log = LoggerFactory.getLogger(InternalMirrorResource.class); + +private final Map herders; + +public InternalMirrorResource(Map herders, RestClient restClient) { +super(restClient); +this.herders = herders; +} + +@Override +protected Herder herderForRequest() { +String source = pathParam("source"); +String target = pathParam("target"); +Herder result = herders.get(new SourceAndTarget(source, target)); +if (result == null) { +log.debug("Failed to find herder for source '{}' and target '{}'", source, target); +throw new NotFoundException("No replication flow found for source '" + source + "' and target '" + target + "'"); +} +return result; +} + +private String pathParam(String name) { +List result = uriInfo.getPathParameters().get(name); +if (result == null || result.isEmpty()) +throw new NotFoundException(); Review Comment: Should we add a message here? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.rest.resources; + +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.NotFoundException; +import javax.ws.rs.Path; +import javax.ws.rs.core.Context; +import
[GitHub] [kafka] dajac commented on pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
dajac commented on PR #12972: URL: https://github.com/apache/kafka/pull/12972#issuecomment-1422991741 Thanks @hachikuji! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1100467620 ## metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java: ## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.ScramMechanism; +import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData; +import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialDeletion; +import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialUpsertion; +import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData; +import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult; +import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +import static org.apache.kafka.common.protocol.Errors.DUPLICATE_RESOURCE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.RESOURCE_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.UNACCEPTABLE_CREDENTIAL; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_SASL_MECHANISM; + + +/** + * Manages SCRAM credentials. + */ +public class ScramControlManager { +static final int MAX_ITERATIONS = 16384; + +static class Builder { +private LogContext logContext = null; +private SnapshotRegistry snapshotRegistry = null; + +Builder setLogContext(LogContext logContext) { +this.logContext = logContext; +return this; +} + +Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) { +this.snapshotRegistry = snapshotRegistry; +return this; +} + +ScramControlManager build() { +if (logContext == null) logContext = new LogContext(); +if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); +return new ScramControlManager(logContext, +snapshotRegistry); +} +} + +static class ScramCredentialKey { +private final String username; +private final ScramMechanism mechanism; + +ScramCredentialKey(String username, ScramMechanism mechanism) { +this.username = username; +this.mechanism = mechanism; +} + +@Override +public int hashCode() { +return Objects.hash(username, mechanism); +} + +@Override +public boolean equals(Object o) { +if (o == null) return false; +if (!(o.getClass() == this.getClass())) return false; +ScramCredentialKey other = (ScramCredentialKey) o; +return username.equals(other.username) && +mechanism.equals(other.mechanism); +} + +@Override +public String toString() { +return "ScramCredentialKey" + +"(username=" + username + +", mechanism=" + mechanism + +")"; +} +} + +static class ScramCredentialValue { +private final byte[] salt; +private final byte[] saltedPassword; +private final int iterations; + +ScramCredentialValue( +byte[] salt, +byte[] saltedPassword, +int iterations +) { +this.salt = salt; +this.saltedPassword = saltedPassword; +this.iterations = iterations; +} + +@Override +
[GitHub] [kafka] hachikuji commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
hachikuji commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1100465745 ## generator/src/main/java/org/apache/kafka/message/ApiMessageTypeGenerator.java: ## @@ -411,7 +409,7 @@ private void generateListenerTypesEnum() { } private void generateHighestStableVersion() { Review Comment: nit: `generateHighestSupportedVersion`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
dajac commented on code in PR #13202: URL: https://github.com/apache/kafka/pull/13202#discussion_r1100463203 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Server side partition assignor used by the GroupCoordinator. Review Comment: Not this one. We will introduce another one for the client. They are close but different and we want to evolve them separately in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
dajac commented on code in PR #13202: URL: https://github.com/apache/kafka/pull/13202#discussion_r1100462303 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentMemberSpec.java: ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.Objects; +import java.util.Optional; + +/** + * The assignment specification for a consumer group member. + */ +public class AssignmentMemberSpec { Review Comment: The server side assignor does not have those: https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-BrokerAPI. Note that the KIP introduces two new `PartitionAssignor` interfaces. One on the server side and one one the client side. They are slightly different. Regarding the `reason`, the `reason` is basically an error code defined by a custom assignor (e.g. streams). The assignor will use this to pass information between a member to the assignor. You can see the ones that Streams will use [here](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-MemberMetadataReasons). Using a string is not really appropriate for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush
C0urante commented on code in PR #13208: URL: https://github.com/apache/kafka/pull/13208#discussion_r1100459265 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java: ## @@ -98,6 +104,24 @@ private boolean flushing() { return toFlush != null; } +public boolean waitForBeginFlush(Supplier timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { Review Comment: It seems like we're bending over backwards here to accommodate an assumption made in `beginFlush` that we'll never try to trigger two offset flushes at once, which is clearly false given the conditions that necessitate this fix (i.e., a task's end-of-life offset flush is triggered at the same time as its periodic offset flush). Given that, do we really need a separate method here, or can we relax the constraints in `beginFlush` to wait for in-progress flushes to conclude instead of throwing an exception if there are any? Additionally, it seems like the use of a `CompleteableFuture` here is a bit strange. Would a `Semaphore` or `CountDownLatch` be more suited? Finally--since this change may lead to us performing double offset commits when a task is being shut down, do you think it might also make sense to add a `close` method to the offset writer that throws an exception for any further attempts to flush, and possibly forcibly terminates any in-progress flushes? We can invoke that in `AbstractWorkerTask::cancel` (or possibly `WorkerSourceTask::cancel` if a different approach is necessary to preserve exactly-once semantics) to help tasks complete shutdown within the timeout allotted to them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
dajac commented on code in PR #13202: URL: https://github.com/apache/kafka/pull/13202#discussion_r1100458653 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentMemberSpec.java: ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.Objects; +import java.util.Optional; + +/** + * The assignment specification for a consumer group member. Review Comment: That's correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1100451315 ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -816,6 +817,17 @@ class ControllerApis(val requestChannel: RequestChannel, } } + def handleAlterUserScramCredentials(request: RequestChannel.Request): CompletableFuture[Unit] = { +val alterRequest = request.body[AlterUserScramCredentialsRequest] +val context = new ControllerRequestContext(request.context.header.data, request.context.principal, Review Comment: Lets chat about this. Currently talking directly with the controller is not allowed for these operations. Only brokers are allowed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
jeffkbkim commented on code in PR #13202: URL: https://github.com/apache/kafka/pull/13202#discussion_r1100419447 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentMemberSpec.java: ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.Objects; +import java.util.Optional; + +/** + * The assignment specification for a consumer group member. + */ +public class AssignmentMemberSpec { Review Comment: from kip 848, it looks like we're missing ``` /** * The reason reported by the member. */ byte reason; ... * The metadata reported by the member. */ Metadata metadata; ``` what's the reason we omitted these? also, why can't we "reason" store as a string? asking because this made things harder to debug from the server side ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentMemberSpec.java: ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.Objects; +import java.util.Optional; + +/** + * The assignment specification for a consumer group member. Review Comment: to confirm, this partition assignor is only used by the new consumer group hence "consumer group"; is this correct? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Server side partition assignor used by the GroupCoordinator. Review Comment: isn't this used by the new client side assignor as well? > The new PartitionAssignor interface will be introduced to replace the ConsumerPartitionAssignor interface also, the kip mentions that we'll be adding uniform & range for server side assignor. what about client side assignors? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer
dajac commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r1100441588 ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -2455,7 +2454,7 @@ private void close(Duration timeout, boolean swallowException) { closeTimer.reset(remainingDurationInTimeout); // This is a blocking call bound by the time remaining in closeTimer -LambdaUtils.swallow(() -> fetcher.close(closeTimer), firstException); +Utils.swallow(log, " fetcher close", () -> fetcher.close(closeTimer), firstException); Review Comment: nit: There is an extra space before `fetcher`. ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -238,7 +238,7 @@ public KafkaClusterTestKit build() throws Exception { bootstrapMetadata); } catch (Throwable e) { log.error("Error creating controller {}", node.id(), e); -Utils.swallow(log, "sharedServer.stopForController", () -> sharedServer.stopForController()); +Utils.swallow(log, "sharedServer.stopForController", () -> sharedServer.stopForController(), null); Review Comment: nit: Should we add an overload to avoid having to pass null here? ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -998,15 +998,18 @@ public static void closeAll(Closeable... closeables) throws IOException { throw exception; } -public static void swallow( -Logger log, -String what, -Runnable runnable -) { -try { -runnable.run(); -} catch (Throwable e) { -log.warn("{} error", what, e); +/** + * Run the supplied code. If an exception is thrown, it is swallowed and registered to the firstException parameter. + */ +public static void swallow(Logger log, String what, final Runnable code, final AtomicReference firstException) { +if (code != null) { +try { +code.run(); +} catch (Throwable t) { +log.warn("{} error", what, t); Review Comment: Should this be an error instead of a warn to be consistent with `closeQuietly`? Moreover, I wonder if we could improve the error message. We would get something like `fetcher close error` which is not really inline with what we usually log. For instance, `closeQuietly` would log something like `Failed to close fetch...`. Do you have any thoughts on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds
[ https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17685984#comment-17685984 ] Chris Egerton commented on KAFKA-14682: --- Thanks Christo! > Unused stubbings are not reported by Mockito during CI builds > - > > Key: KAFKA-14682 > URL: https://issues.apache.org/jira/browse/KAFKA-14682 > Project: Kafka > Issue Type: Test > Components: unit tests >Reporter: Chris Egerton >Assignee: Christo Lolov >Priority: Major > > We've started using [strict > stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html] > for unit tests written with Mockito, which is supposed to automatically fail > tests when they set up mock expectations that go unused. > However, these failures are not reported during Jenkins builds, even if they > are reported when building/testing locally. > In at least one case, this difference appears to be because our [Jenkins > build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35] > uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the > project's [Gradle build > file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543], > instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the > latter instead of the former, which can cause tests to fail due to > unnecessary stubbings when being run in that IDE but not when being built on > Jenkins. > It's possible that, because the custom test tasks filter out some tests from > running, Mockito does not check for unnecessary stubbings in order to avoid > incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} > method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on PR #13178: URL: https://github.com/apache/kafka/pull/13178#issuecomment-1422939422 Oh, and the Jenkins build seems to be consistently failing on the `IdentityReplicationIntegrationTest::testNoCheckpointsIfNoRecordsAreMirrored` test case. Probably want to look into that before we merge -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1099340592 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -700,21 +673,53 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName, int cnt = 0; for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++) for (int p = 0; p < numPartitions; p++) -cluster.kafka().produce(topicName, p, "key", "value-" + cnt++); +produce(cluster, topicName, p, "key", "value-" + cnt++); } - + +protected void produce(EmbeddedConnectCluster cluster, String topic, Integer partition, String key, String value) { +cluster.kafka().produce(topic, partition, key, value); +} + +protected static Map waitForAnyCheckpoint( Review Comment: Nit: why "anyCheckpoint" instead of "allCheckpoints", given that we're ensuring that there's a checkpoint for every partition of the topic, instead of only one? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -56,8 +56,7 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr // Offset is too far in the past to translate accurately return OptionalLong.of(-1L); } -long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset(); -return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep); +return OptionalLong.of(offsetSync.get().downstreamOffset() + (offsetSync.get().upstreamOffset() == upstreamOffset ? 0 : 1)); Review Comment: I believe this part is correct, but it took me a while to grok the motivation here. Could we pull this out into two lines, one to determine the amount we bump by (i.e., 0 or 1) with a comment providing a rationale for the logic (including why we can't do a more aggressive bump derived from the delta between the upstream offsets for the consumer group and the checkpoint), and one to return the downstream offset plus that bump? ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java: ## @@ -233,6 +229,7 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio // have been automatically synchronized from primary to backup by the background job, so no // more records to consume from the replicated topic by the same consumer group at backup cluster assertEquals(0, records.count(), "consumer record size is not zero"); +backupConsumer.close(); Review Comment: Good catch! Can we use a try-with-resources block for these consumers? Should set a precedent that makes it harder to leak them in the future. ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -725,11 +730,20 @@ protected static void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream() .mapToLong(OffsetAndMetadata::offset).sum(); -Map offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS); -long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum(); - +Map offsets = Review Comment: Nit: "offsets" and "totalOffsets" are a little generic and confusing; WDYT about "endOffsets" and "totalEndOffsets"? ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() throws InterruptedExceptio Map translatedOffsets = backupClient.remoteConsumerOffsets( consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L)); return translatedOffsets.containsKey(remoteTopicPartition(tp1, PRIMARY_CLUSTER_ALIAS)) && - translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS)); + !translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS)); Review Comment: If I'm reading this correctly, the reason we don't expect `tp2` to have translated offsets here is because the upstream consumer group offset for this partition is behind the checkpoints emitted for it. If that's correct: 1. Any idea why this was passing before? 2. Do you think we should expand the test to commit an offset for this partition in the consumer group, then verify that that offset is included in the translated offsets afterward? ##
[GitHub] [kafka] omkreddy commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
omkreddy commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1100299605 ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -816,6 +817,17 @@ class ControllerApis(val requestChannel: RequestChannel, } } + def handleAlterUserScramCredentials(request: RequestChannel.Request): CompletableFuture[Unit] = { +val alterRequest = request.body[AlterUserScramCredentialsRequest] +val context = new ControllerRequestContext(request.context.header.data, request.context.principal, Review Comment: We need to authorisation `authHelper.authorizeClusterOperation(request, ALTER)` check here. Similar check in KafkaAPI: [handleAlterUserScramCredentialsRequest](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L3326) ## core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala: ## @@ -85,15 +113,17 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest { .setUpsertions(util.Arrays.asList(upsertion1, upsertion2))).build(), ) requests.foreach(request => { - val response = sendAlterUserScramCredentialsRequest(request) + val response = sendAlterUserScramCredentialsRequest(request, adminSocketServer) val results = response.data.results assertEquals(2, results.size) checkAllErrorsAlteringCredentials(results, Errors.DUPLICATE_RESOURCE, "when altering the same credential twice in a single request") }) } - @Test - def testAlterEmptyUser(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("kraft", "zk")) + def testAlterEmptyUser(quorum: String): Unit = { + println("Starting test") Review Comment: unwanted line ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -221,6 +223,21 @@ class BrokerMetadataPublisher( s"quotas in ${deltaName}", t) } + // Apply changes to SCRAM credentials. + Option(delta.scramDelta()).foreach { scramDelta => Review Comment: May I know, how are applying changes on Controller Nodes: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ControllerServer.scala#L176 ## metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.clients.admin.ScramMechanism; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.security.scram.ScramCredential; +import org.apache.kafka.common.security.scram.internals.ScramFormatter; +import org.apache.kafka.common.utils.Bytes; + +import java.security.GeneralSecurityException; +import java.util.Arrays; +import java.util.Objects; + + +/** + * Represents the ACLs in the metadata image. + * + * This class is thread-safe. + */ +public final class ScramCredentialData { +private final byte[] salt; +private final byte[] saltedPassword; +private final int iterations; + +static ScramCredentialData fromRecord( +UserScramCredentialRecord record +) { +return new ScramCredentialData( +record.salt(), +record.saltedPassword(), +record.iterations()); +} + +public ScramCredentialData( +byte[] salt, +byte[] saltedPassword, +int iterations +) { +this.salt = salt; +this.saltedPassword = saltedPassword; +this.iterations = iterations; +} + +public byte[] salt() { +return salt; +} + +public byte[] saltedPassword() { +return saltedPassword; +} + +public int iterations() { +return iterations; +} + +public UserScramCredentialRecord toRecord( +String userName, +ScramMechanism mechanism +) { +return new UserScramCredentialRecord(). +setName(userName). +setMechanism(mechanism.type()). +setSalt(salt). +
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer
divijvaidya commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r1100385270 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ## @@ -447,6 +430,33 @@ private RequestFuture sendMetadataRequest(MetadataRequest.Builde return client.send(node, request); } +/** + * Send Fetch Request to Kafka cluster asynchronously. + * + * This method is visible for testing. + * + * @return A future that indicates result of sent Fetch request + */ +private RequestFuture sendFetchRequestToNode(final FetchSessionHandler.FetchRequestData requestData, + final Node fetchTarget) { Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer
divijvaidya commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r1100384922 ## clients/src/main/java/org/apache/kafka/common/utils/LambdaUtils.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Lambda helpers. + */ +@FunctionalInterface +public interface LambdaUtils { +/** + * Run some code, possibly throw some exceptions. + * + * @throws Exception + */ +void run() throws Exception; + +/** + * Provide an idempotent instance of the supplied code - ensure that the supplied code gets run only once, no + * matter how many times .run() is called. + */ +static Runnable idempotent(final Runnable code) { Review Comment: No, it's not used anywhere, I added it as a helper for future. But now I have removed this file completely and moved only the essential functions to `Utils.java` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer
divijvaidya commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r1100383535 ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -2425,17 +2426,38 @@ private ClusterResourceListeners configureClusterResourceListeners(Deserializer< return clusterResourceListeners; } -private void close(long timeoutMs, boolean swallowException) { +private Timer createTimerForRequest(final Duration timeout) { +// this.time could be null if an exception occurs in constructor prior to setting the this.time field +final Time localTime = (time == null) ? Time.SYSTEM : time; +return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); +} + +private void close(Duration timeout, boolean swallowException) { log.trace("Closing the Kafka consumer"); AtomicReference firstException = new AtomicReference<>(); -try { -if (coordinator != null) -coordinator.close(time.timer(Math.min(timeoutMs, requestTimeoutMs))); -} catch (Throwable t) { -firstException.compareAndSet(null, t); -log.error("Failed to close coordinator", t); + +final Timer closeTimer = createTimerForRequest(timeout); +// Close objects with a timeout. The timeout is required because the coordinator & the fetcher send requests to +// the server in the process of closing which may not respect the overall timeout defined for closing the +// consumer. +if (coordinator != null) { +// This is a blocking call bound by the time remaining in closeTimer +LambdaUtils.swallow(() -> coordinator.close(closeTimer), firstException); Review Comment: Moved to Utils in latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13127: KAFKA-14586: Moving StreamResetter to tools
vamossagar12 commented on PR #13127: URL: https://github.com/apache/kafka/pull/13127#issuecomment-1422825052 > @vamossagar12 as you can see from the CI output, there are checkstyle errors. Have you tried to run at least one of the system tests that are using the tool? hi @fvaleri , yes I am aware of the checkstyle issue and pointed it out here: https://github.com/apache/kafka/pull/13127#issuecomment-1396439952. I have run the StreamResetter tests and also modified the system test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer
dajac commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r1100295718 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ## @@ -447,6 +430,33 @@ private RequestFuture sendMetadataRequest(MetadataRequest.Builde return client.send(node, request); } +/** + * Send Fetch Request to Kafka cluster asynchronously. + * + * This method is visible for testing. + * + * @return A future that indicates result of sent Fetch request + */ +private RequestFuture sendFetchRequestToNode(final FetchSessionHandler.FetchRequestData requestData, + final Node fetchTarget) { Review Comment: nit: Indentation is off here. ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -2425,17 +2426,38 @@ private ClusterResourceListeners configureClusterResourceListeners(Deserializer< return clusterResourceListeners; } -private void close(long timeoutMs, boolean swallowException) { +private Timer createTimerForRequest(final Duration timeout) { +// this.time could be null if an exception occurs in constructor prior to setting the this.time field +final Time localTime = (time == null) ? Time.SYSTEM : time; +return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); +} + +private void close(Duration timeout, boolean swallowException) { log.trace("Closing the Kafka consumer"); AtomicReference firstException = new AtomicReference<>(); -try { -if (coordinator != null) -coordinator.close(time.timer(Math.min(timeoutMs, requestTimeoutMs))); -} catch (Throwable t) { -firstException.compareAndSet(null, t); -log.error("Failed to close coordinator", t); + +final Timer closeTimer = createTimerForRequest(timeout); +// Close objects with a timeout. The timeout is required because the coordinator & the fetcher send requests to +// the server in the process of closing which may not respect the overall timeout defined for closing the +// consumer. +if (coordinator != null) { +// This is a blocking call bound by the time remaining in closeTimer +LambdaUtils.swallow(() -> coordinator.close(closeTimer), firstException); Review Comment: We have `closeQuietly` in `Utils` that is pretty close to this one. I think that `swallow` should be move there as well. We don't need another `*Utils` class. ## clients/src/main/java/org/apache/kafka/common/utils/LambdaUtils.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Lambda helpers. + */ +@FunctionalInterface +public interface LambdaUtils { +/** + * Run some code, possibly throw some exceptions. + * + * @throws Exception + */ +void run() throws Exception; + +/** + * Provide an idempotent instance of the supplied code - ensure that the supplied code gets run only once, no + * matter how many times .run() is called. + */ +static Runnable idempotent(final Runnable code) { Review Comment: Is this one used anywhere? I can't find it. ## clients/src/main/java/org/apache/kafka/common/utils/LambdaUtils.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + *
[GitHub] [kafka] littlehorse-eng commented on pull request #13082: MINOR: Clarify docs for Streams config max.warmup.replicas.
littlehorse-eng commented on PR #13082: URL: https://github.com/apache/kafka/pull/13082#issuecomment-1422802097 @lucasbru I used the repo you sent; the home page loaded albeit without CSS and when I clicked on Streams documentation it yielded the same result as navigating to the file on my localhost. Must be doing something wrong... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ouyangnengda opened a new pull request, #13220: Kafka 14253
ouyangnengda opened a new pull request, #13220: URL: https://github.com/apache/kafka/pull/13220 Add the number of members in the log to improve readability. Example logs - `StreamsPartitionAssignor.java` ```xml Before - [2023-02-08 23:22:21,419] INFO stream-thread [] Assigned tasks [0_0, 0_1, 0_2, 0_3] including stateful [] to clients as: ----0001=[activeTasks: ([0_0, 0_2, 0_3]) standbyTasks: ([])] ----0002=[activeTasks: ([0_1]) standbyTasks: ([])]. After - [2023-02-07 23:05:29,723] INFO stream-thread [] Assigned 4 tasks [0_0, 0_1, 0_2, 0_3] including 0 stateful [] to 2 clients as: ----0001=[3 activeTasks: ([0_0, 0_2, 0_3]) 0 standbyTasks: ([])] ----0002=[1 activeTasks: ([0_1]) 0 standbyTasks: ([])]. ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer
dajac commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r1100288626 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ## @@ -1933,11 +1943,79 @@ private Map topicPartitionTags(TopicPartition tp) { } } +// Visible for testing +void maybeCloseFetchSessions(final Timer timer) { +final Cluster cluster = metadata.fetch(); +final List> requestFutures = new ArrayList<>(); +for (final Map.Entry entry : sessionHandlers.entrySet()) { +final FetchSessionHandler sessionHandler = entry.getValue(); +// set the session handler to notify close. This will set the next metadata request to send close message. +sessionHandler.notifyClose(); + +final int sessionId = sessionHandler.sessionId(); +final Integer fetchTargetNodeId = entry.getKey(); +// FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will +// skip sending the close request. +final Node fetchTarget = cluster.nodeById(fetchTargetNodeId); +if (fetchTarget == null || client.isUnavailable(fetchTarget)) { +log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget); +continue; +} + +final RequestFuture responseFuture = sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget); +responseFuture.addListener(new RequestFutureListener() { +@Override +public void onSuccess(ClientResponse value) { +log.debug("Successfully sent a close message for fetch session: {} to node: {}", sessionId, fetchTarget); +} + +@Override +public void onFailure(RuntimeException e) { +log.debug("Unable to a close message for fetch session: {} to node: {}. " + +"This may result in unnecessary fetch sessions at the broker.", sessionId, fetchTarget, e); +} +}); + +requestFutures.add(responseFuture); +} + +// Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until +// all requests have received a response. +do { +client.poll(timer, null, true); +} while (timer.notExpired() && !requestFutures.stream().allMatch(RequestFuture::isDone)); + +if (!requestFutures.stream().allMatch(RequestFuture::isDone)) { +// we ran out of time before completing all futures. It is ok since we don't want to block the shutdown +// here. +log.debug("All requests couldn't be sent in the specific timeout period {}ms. " + +"This may result in unnecessary fetch sessions at the broker. Consider increasing the timeout passed for " + +"KafkaConsumer.close(Duration timeout)", timer.timeoutMs()); +} +} + +public void close(final Timer timer) { +if (!isClosed.compareAndSet(false, true)) { +log.info("Fetcher {} is already closed.", this); +return; +} + +// Shared states (e.g. sessionHandlers) could be accessed by multiple threads (such as heartbeat thread), hence, +// it is necessary to acquire a lock on the fetcher instance before modifying the states. Review Comment: Thanks for the clarification. That makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs
[ https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17685935#comment-17685935 ] Mickael Maison commented on KAFKA-14692: cc [~mumrah] > Issues in Zookeeper to KRaft migration docs > --- > > Key: KAFKA-14692 > URL: https://issues.apache.org/jira/browse/KAFKA-14692 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Mickael Maison >Priority: Major > > Following [https://kafka.apache.org/documentation/#kraft_zk_migration] > 1) It completely skips the facts that the storage for the new quorum should > be formatted using the existing cluster id. > 2) In Provisioning the KRaft controller quorum: > {{controller.quorum.voters=1@localhost:9093}} should be > {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000 > 3) When migrating brokers, it states: > {code:java} > # Don't set the IBP, KRaft uses "metadata.version" feature flag > # inter.broker.protocol.version=3.4 > # Keep the migration enabled > zookeeper.metadata.migration.enable=true > # Remove ZooKeeper client configuration > # zookeeper.connect=localhost:2181 > {code} > However if I do that, my brokers fails to restart and print: > {code:java} > org.apache.kafka.common.config.ConfigException: If using > zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must > also be set. > {code} > 4) When disabling zookeeper.metadata.migration.enable or keeping > zookeeper.connect to get past this step, when brokers restart they print a > lot of error messages: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-08 12:06:25,776] ERROR Exception while processing request from > 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs
[ https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-14692: --- Description: Following [https://kafka.apache.org/documentation/#kraft_zk_migration] 1) It completely skips the facts that the storage for the new quorum should be formatted using the existing cluster id. 2) In Provisioning the KRaft controller quorum: {{controller.quorum.voters=1@localhost:9093}} should be {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000 3) When migrating brokers, it states: {code:java} # Don't set the IBP, KRaft uses "metadata.version" feature flag # inter.broker.protocol.version=3.4 # Keep the migration enabled zookeeper.metadata.migration.enable=true # Remove ZooKeeper client configuration # zookeeper.connect=localhost:2181 {code} However if I do that, my brokers fails to restart and print: {code:java} org.apache.kafka.common.config.ConfigException: If using zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must also be set. {code} 4) When disabling zookeeper.metadata.migration.enable or keeping zookeeper.connect to get past this step, when brokers restart they print a lot of error messages: {code:java} org.apache.kafka.common.errors.InvalidRequestException: Received request api key LEADER_AND_ISR which is not enabled [2023-02-08 12:06:25,776] ERROR Exception while processing request from 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) {code} was: Following https://kafka.apache.org/documentation/#kraft_zk_migration 1) It completely skips the facts that the storage for the new quorum should be formatted using the existing cluster id. 2) In Provisioning the KRaft controller quorum: controller.quorum.voters=1@localhost:9093 should be controller.quorum.voters=3000@localhost:9093 as node.id=3000 3) When migrating brokers, it states: # Don't set the IBP, KRaft uses "metadata.version" feature flag # inter.broker.protocol.version=3.4 # Keep the migration enabled zookeeper.metadata.migration.enable=true # Remove ZooKeeper client configuration # zookeeper.connect=localhost:2181 However if I do that, my brokers fails to restart and print: org.apache.kafka.common.config.ConfigException: If using zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must also be set. 4) When disabling zookeeper.metadata.migration.enable or keeping zookeeper.connect to get past this step, when brokers restart they print a lot of error messages: org.apache.kafka.common.errors.InvalidRequestException: Received request api key LEADER_AND_ISR which is not enabled [2023-02-08 12:06:25,776] ERROR Exception while processing request from 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) > Issues in Zookeeper to KRaft migration docs > --- > > Key: KAFKA-14692 > URL: https://issues.apache.org/jira/browse/KAFKA-14692 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Mickael Maison >Priority: Major > > Following [https://kafka.apache.org/documentation/#kraft_zk_migration] > 1) It completely skips the facts that the storage for the new quorum should > be formatted using the existing cluster id. > 2) In Provisioning the KRaft controller quorum: > {{controller.quorum.voters=1@localhost:9093}} should be > {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000 > 3) When migrating brokers, it states: > {code:java} > # Don't set the IBP, KRaft uses "metadata.version" feature flag > # inter.broker.protocol.version=3.4 > # Keep the migration enabled > zookeeper.metadata.migration.enable=true > # Remove ZooKeeper client configuration > # zookeeper.connect=localhost:2181 > {code} > However if I do that, my brokers fails to restart and print: > {code:java} > org.apache.kafka.common.config.ConfigException: If using > zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must > also be set. > {code} > 4) When disabling zookeeper.metadata.migration.enable or keeping > zookeeper.connect to get past this step, when brokers restart they print a > lot of error messages: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-08 12:06:25,776] ERROR Exception while processing request from > 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs
Mickael Maison created KAFKA-14692: -- Summary: Issues in Zookeeper to KRaft migration docs Key: KAFKA-14692 URL: https://issues.apache.org/jira/browse/KAFKA-14692 Project: Kafka Issue Type: Improvement Components: docs Reporter: Mickael Maison Following https://kafka.apache.org/documentation/#kraft_zk_migration 1) It completely skips the facts that the storage for the new quorum should be formatted using the existing cluster id. 2) In Provisioning the KRaft controller quorum: controller.quorum.voters=1@localhost:9093 should be controller.quorum.voters=3000@localhost:9093 as node.id=3000 3) When migrating brokers, it states: # Don't set the IBP, KRaft uses "metadata.version" feature flag # inter.broker.protocol.version=3.4 # Keep the migration enabled zookeeper.metadata.migration.enable=true # Remove ZooKeeper client configuration # zookeeper.connect=localhost:2181 However if I do that, my brokers fails to restart and print: org.apache.kafka.common.config.ConfigException: If using zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must also be set. 4) When disabling zookeeper.metadata.migration.enable or keeping zookeeper.connect to get past this step, when brokers restart they print a lot of error messages: org.apache.kafka.common.errors.InvalidRequestException: Received request api key LEADER_AND_ISR which is not enabled [2023-02-08 12:06:25,776] ERROR Exception while processing request from 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] pprovenzano commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on PR #13114: URL: https://github.com/apache/kafka/pull/13114#issuecomment-1422710574 I plan to update existing tests after bootstrap support is committed. I'll check on the `handleDescribeUserScramCredentialsRequest` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
viktorsomogyi commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1100211704 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java: ## @@ -255,13 +287,26 @@ private void addHerder(SourceAndTarget sourceAndTarget) { // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than // tracking the various shared admin objects in this class. -// Do not provide a restClient to the DistributedHerder to indicate that request forwarding is disabled Herder herder = new DistributedHerder(distributedConfig, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, -advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin); +advertisedUrl, restClient, CLIENT_CONFIG_OVERRIDE_POLICY, +restNamespace, sharedAdmin); herders.put(sourceAndTarget, herder); } +private static String encodePath(String rawPath) throws UnsupportedEncodingException { +return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name()) +// Java's out-of-the-box URL encoder encodes spaces (' ') as pluses ('+'), +// and pluses as '%2B' +// But Jetty doesn't decode pluses at all and leaves them as-are in decoded +// URLs +// So to get around that, we replace pluses in the encoded URL here with '%20', +// which is the encoding that Jetty expects for spaces +// Jetty will reverse this transformation when evaluating the path parameters +// and will return decoded strings with all special characters as they were. Review Comment: Ok, I'm fine with it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org