[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r660250731 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java ## @@ -30,24 +31,62 @@ */ @InterfaceStability.Evolving public class DeleteTopicsResult { -final Map> futures; +private Map> nameFutures; +private Map> topicIdFutures; -protected DeleteTopicsResult(Map> futures) { -this.futures = futures; +protected DeleteTopicsResult() {} Review comment: Ah I remember the issue with that being private. There is a test that extends this class. > @hachikuji I originally wanted the constructor for DeleteTopicsResult to be private, but InternalTopicManagerTest required creating a subclass. https://github.com/apache/kafka/pull/10892#issuecomment-862876282 The issue with just using the static method is that it is protected in the class. So the options are to either make the class protected (as is now), make the static method public, or change the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13005) Support JBOD in kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-13005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17371005#comment-17371005 ] dengziming commented on KAFKA-13005: This is currently blocked by KAFKA-9837, I will do this ;) > Support JBOD in kraft mode > -- > > Key: KAFKA-13005 > URL: https://issues.apache.org/jira/browse/KAFKA-13005 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: dengziming >Priority: Major > Labels: kip-500 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13005) Support JBOD in kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-13005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming reassigned KAFKA-13005: -- Assignee: dengziming > Support JBOD in kraft mode > -- > > Key: KAFKA-13005 > URL: https://issues.apache.org/jira/browse/KAFKA-13005 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: dengziming >Priority: Major > Labels: kip-500 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
guozhangwang commented on pull request #10798: URL: https://github.com/apache/kafka/pull/10798#issuecomment-870174294 You were referring to this commit https://github.com/apache/kafka/pull/10798/commits/68a947c0eb6a5cc4bdde24083c83f4638e708edb as for the tweaks right? BTW it's bit interesting to see the improvement for range is around 1.442 / 1.131 = 1.27 while for putAll it is 1.065 / 0.919 = 1.15. Given the key length to be similar in the benchmarks I was expecting latter has a bigger benefit. @cadonna WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot
jsancio commented on a change in pull request #10899: URL: https://github.com/apache/kafka/pull/10899#discussion_r660227382 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java ## @@ -100,4 +116,21 @@ public void close() { new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, maxBatchSize) ); } + +/** + * Returns the next non-control Batch + */ +private Optional> nextBatch() { +while (iterator.hasNext()) { +Batch batch = iterator.next(); + +if (batch.records().isEmpty()) { +continue; +} else { +return Optional.of(batch); +} Review comment: ```suggestion if (!batch.records().isEmpty()) { return Optional.of(batch); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r660228674 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -471,16 +512,26 @@ class IncrementalFetchContext(private val time: Time, if (session.epoch != expectedEpoch) { info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but " + s"got ${session.epoch}. Possible duplicate request.") -FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP) +FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP, Collections.emptyMap()) } else { +var topLevelError = Errors.NONE // Iterate over the update list using PartitionIterator. This will prune updates which don't need to be sent +// It will also set the top-level error to INCONSISTENT_TOPIC_ID if any partitions had this error. val partitionIter = new PartitionIterator(updates.entrySet.iterator, true) while (partitionIter.hasNext) { - partitionIter.next() + val entry = partitionIter.next() + if (entry.getValue.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) { Review comment: The topic ID should not change in the log once it is set. I think what you said in the last sentence is correct. My understanding is that if the log is closed, it can not read from it anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r660228129 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ## @@ -296,11 +276,24 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { // may not be any partitions at all in the response. For this reason, the top-level error code // is essential for them. Errors error = Errors.forException(e); -LinkedHashMap responseData = new LinkedHashMap<>(); -for (Map.Entry entry : fetchData.entrySet()) { -responseData.put(entry.getKey(), FetchResponse.partitionResponse(entry.getKey().partition(), error)); +List topicResponseList = new ArrayList<>(); +// Since UNKNOWN_TOPIC_ID is a new error type only returned when topic ID requests are made (from newer clients), +// we can skip returning the error on all partitions and returning any partitions at all. +if (error != Errors.UNKNOWN_TOPIC_ID) { Review comment: Yeah. I agree it is a bit weird. We can update as you mentioned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
junrao commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r660211602 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -471,16 +512,26 @@ class IncrementalFetchContext(private val time: Time, if (session.epoch != expectedEpoch) { info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but " + s"got ${session.epoch}. Possible duplicate request.") -FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP) +FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP, Collections.emptyMap()) } else { +var topLevelError = Errors.NONE // Iterate over the update list using PartitionIterator. This will prune updates which don't need to be sent +// It will also set the top-level error to INCONSISTENT_TOPIC_ID if any partitions had this error. val partitionIter = new PartitionIterator(updates.entrySet.iterator, true) while (partitionIter.hasNext) { - partitionIter.next() + val entry = partitionIter.next() + if (entry.getValue.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) { Review comment: > I'm still not sure I follow "pending fetch request could still reference the outdated Partition object and therefore miss the topicId change" My understanding is that the log is the source of truth and we will either read from the log if it matches and not read if it doesn't. I see we could get an error erroneously if the partition didn't update in time, but I don't see us being able to read from the log due to a stale partition. > > Or are you referring to the getPartitionOrException(tp) call picking up a stale partition and both the request and the partition are stale? In this case, we will read from the log, but will identify it with its correct ID. The client will handle based on this. A fetch request may pass the topicId check in ReplicaManager and is about to call log.read(), when the topicId changes. I was wondering in that case, if log.read() could return data that corresponds to the old topicId. It seems that's not possible since Log.close() closes all segments. ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -354,38 +377,55 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque * @param cache The fetch session cache. * @param reqMetadataThe request metadata. * @param fetchData The partition data from the fetch request. + * @param usesTopicIds True if this session should use topic IDs. + * @param topicIds The map from topic names to topic IDs. * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, private val cache: FetchSessionCache, private val reqMetadata: JFetchMetadata, private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData], + private val usesTopicIds: Boolean, + private val topicIds: util.Map[String, Uuid], private val isFromFollower: Boolean) extends FetchContext { override def getFetchOffset(part: TopicPartition): Option[Long] = Option(fetchData.get(part)).map(_.fetchOffset) - override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = { -fetchData.forEach(fun(_, _)) + override def foreachPartition(fun: (TopicPartition, Uuid, FetchRequest.PartitionData) => Unit): Unit = { +fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data)) } override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = { -FetchResponse.sizeOf(versionId, updates.entrySet.iterator) +FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds) } override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = { -def createNewSession: FetchSession.CACHE_MAP = { +var topLevelError = Errors.NONE +def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) = { val cachedPartitions = new FetchSession.CACHE_MAP(updates.size) + val sessionTopicIds = new util.HashMap[String, Uuid](updates.size) updates.forEach { (part, respData) => +if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) { + info(s"Session encountered an inconsistent topic ID for topicPartition $part.") + topLevelError = Errors.INCONSISTENT_TOPIC_ID +} val reqData = fetchData.get(part) -cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData)) +val id = topicIds.getOrDefault(part.topic(),
[GitHub] [kafka] jacky1193610322 commented on a change in pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)
jacky1193610322 commented on a change in pull request #10931: URL: https://github.com/apache/kafka/pull/10931#discussion_r660204460 ## File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java ## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.RemoveTopicRecord; +import org.apache.kafka.common.metadata.TopicRecord; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + + +/** + * Represents changes to the topics in the metadata image. + */ +public final class TopicsDelta { +private final TopicsImage image; + +/** + * A map from topic IDs to the topic deltas for each topic. Topics which have been + * deleted will not appear in this map. + */ +private final Map changedTopics = new HashMap<>(); + +/** + * The IDs of topics that exist in the image but that have been deleted. Note that if + * a topic does not exist in the image, it will also not exist in this set. Topics + * that are created and then deleted within the same delta will leave no trace. + */ +private final Set deletedTopicIds = new HashSet<>(); Review comment: I'm sorry too, I have no question if we always replay each record once and exactly once even if the broker crash. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot
jsancio commented on a change in pull request #10899: URL: https://github.com/apache/kafka/pull/10899#discussion_r660209060 ## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ## @@ -654,14 +657,69 @@ private static void writeLeaderChangeMessage(ByteBuffer buffer, long timestamp, int leaderEpoch, LeaderChangeMessage leaderChangeMessage) { +try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder( +buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, +TimestampType.CREATE_TIME, initialOffset, timestamp, +RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, +false, true, leaderEpoch, buffer.capacity()) +) { +builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage); +} +} + +public static MemoryRecords withSnapshotHeaderRecord( +long initialOffset, +long timestamp, +int leaderEpoch, +ByteBuffer buffer, +SnapshotHeaderRecord snapshotHeaderRecord +) { +writeSnapshotHeaderRecord(buffer, initialOffset, timestamp, leaderEpoch, snapshotHeaderRecord); +buffer.flip(); +return MemoryRecords.readableRecords(buffer); +} + +private static void writeSnapshotHeaderRecord(ByteBuffer buffer, +long initialOffset, +long timestamp, +int leaderEpoch, +SnapshotHeaderRecord snapshotHeaderRecord +) { +try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder( +buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, +TimestampType.CREATE_TIME, initialOffset, timestamp, +RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, +false, true, leaderEpoch, buffer.capacity()) +) { +builder.appendSnapshotHeaderMessage(timestamp, snapshotHeaderRecord); +} +} + +public static MemoryRecords withSnapshotFooterRecord( +long initialOffset, +long timestamp, +int leaderEpoch, +ByteBuffer buffer, +SnapshotFooterRecord snapshotFooterRecord +) { +writeSnapshotFooterRecord(buffer, initialOffset, timestamp, leaderEpoch, snapshotFooterRecord); +buffer.flip(); +return MemoryRecords.readableRecords(buffer); +} + +private static void writeSnapshotFooterRecord(ByteBuffer buffer, +long initialOffset, +long timestamp, +int leaderEpoch, +SnapshotFooterRecord snapshotFooterRecord +) { MemoryRecordsBuilder builder = new MemoryRecordsBuilder( buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, initialOffset, timestamp, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, true, leaderEpoch, buffer.capacity() ); -builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage); +builder.appendSnapshotFooterMessage(timestamp, snapshotFooterRecord); builder.close(); Review comment: We should use Java's try-with-resources here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12631) Support api to resign raft leadership
[ https://issues.apache.org/jira/browse/KAFKA-12631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12631. - Fix Version/s: 3.0.0 Resolution: Fixed > Support api to resign raft leadership > - > > Key: KAFKA-12631 > URL: https://issues.apache.org/jira/browse/KAFKA-12631 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.0 > > > It is useful to allow the controller to explicitly resign after encountering > an error of some kind. The Raft state machine implements a Resigned state, > but it is only currently used during graceful shutdown. > This work depends on both of the following jiras: > - KAFKA-12342: Adds resign() api after merging MetaLogManager interface > - KAFKA-12607: Adds support for granting votes while in the Resigned state -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`
hachikuji merged pull request #10913: URL: https://github.com/apache/kafka/pull/10913 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jacky1193610322 commented on a change in pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)
jacky1193610322 commented on a change in pull request #10931: URL: https://github.com/apache/kafka/pull/10931#discussion_r660204460 ## File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java ## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.RemoveTopicRecord; +import org.apache.kafka.common.metadata.TopicRecord; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + + +/** + * Represents changes to the topics in the metadata image. + */ +public final class TopicsDelta { +private final TopicsImage image; + +/** + * A map from topic IDs to the topic deltas for each topic. Topics which have been + * deleted will not appear in this map. + */ +private final Map changedTopics = new HashMap<>(); + +/** + * The IDs of topics that exist in the image but that have been deleted. Note that if + * a topic does not exist in the image, it will also not exist in this set. Topics + * that are created and then deleted within the same delta will leave no trace. + */ +private final Set deletedTopicIds = new HashSet<>(); Review comment: I'm sorry too, I have no problem if we always replay each record once and exactly once even if the broker crash. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot
cmccabe commented on a change in pull request #10899: URL: https://github.com/apache/kafka/pull/10899#discussion_r660199695 ## File path: clients/src/main/resources/common/message/MetadataSnapshotFooterRecord.json ## @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "MetadataSnapshotFooterRecord", Review comment: Perhaps. However, if we needed a specific metadata header or footer, we'd do that as a metadata record. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r660199209 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ## @@ -277,14 +277,18 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { // is essential for them. Errors error = Errors.forException(e); List topicResponseList = new ArrayList<>(); -data.topics().forEach(topic -> { -List partitionResponses = topic.partitions().stream().map(partition -> -FetchResponse.partitionResponse(partition.partition(), error)).collect(Collectors.toList()); -topicResponseList.add(new FetchResponseData.FetchableTopicResponse() -.setTopic(topic.topic()) -.setTopicId(topic.topicId()) -.setPartitions(partitionResponses)); -}); +// Since UNKNOWN_TOPIC_ID is a new error type only returned when topic ID requests are made (from newer clients), Review comment: We need to do something like this to easily get the top level error with no partition response for UNKNOWN_TOPIC_ID. I think this works, but we may want a version check as well just to be safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
ccding commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r660196562 ## File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ## @@ -1472,11 +1489,30 @@ class LogCleanerTest { time.scheduler.clear() cleanedKeys = LogTestUtils.keysInLog(log) -// 3) Simulate recovery after swap file is created and old segments files are renamed +// 4) Simulate recovery after swap file is created and old segments files are renamed //to .deleted. Clean operation is resumed during recovery. log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix) log = recoverAndCheck(config, cleanedKeys) +// add some more messages and clean the log again +while (log.numberOfSegments < 10) { + log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0) + messageCount += 1 +} +for (k <- 1 until messageCount by 2) + offsetMap.put(key(k), Long.MaxValue) +cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), + new CleanedTransactionMetadata) +// clear scheduler so that async deletes don't run +time.scheduler.clear() +cleanedKeys = LogTestUtils.keysInLog(log) + +// 5) Simulate recovery after a subset of swap files are renamed to regular files and old segments files are renamed +//to .deleted. Clean operation is resumed during recovery. +log.logSegments.head.timeIndex.file.renameTo(new File(CoreUtils.replaceSuffix(log.logSegments.head.timeIndex.file.getPath, "", Log.SwapFileSuffix))) + // .changeFileSuffixes("", Log.SwapFileSuffix) Review comment: My bad. Forgot to delete this line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r660193316 ## File path: clients/src/main/java/org/apache/kafka/common/TopicCollection.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import java.util.Collection; + +/** + * A class used to represent a collection of topics. This collection may define topics by topic name + * or topic ID. Subclassing this class beyond the classes provided here is not supported. + */ +public abstract class TopicCollection { Review comment: I think it's simpler with just the factories, so I like this idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics
hachikuji commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r660179959 ## File path: clients/src/main/java/org/apache/kafka/common/TopicCollection.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import java.util.Collection; + +/** + * A class used to represent a collection of topics. This collection may define topics by topic name + * or topic ID. Subclassing this class beyond the classes provided here is not supported. + */ +public abstract class TopicCollection { Review comment: No strong opinion, but I'd probably vote to keep the constructors private. Might be worth getting a second opinion. @ijuma ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics
hachikuji commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r660178840 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java ## @@ -30,24 +31,62 @@ */ @InterfaceStability.Evolving public class DeleteTopicsResult { -final Map> futures; +private Map> nameFutures; +private Map> topicIdFutures; -protected DeleteTopicsResult(Map> futures) { -this.futures = futures; +protected DeleteTopicsResult() {} Review comment: Yeah, right. As long as the constructor is private, you can ensure one of them must be null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12975) Consider how Topic IDs can improve consume experience
[ https://issues.apache.org/jira/browse/KAFKA-12975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17370944#comment-17370944 ] Jason Gustafson commented on KAFKA-12975: - I think the main guarantee we want is that offsets cannot be mistakenly reused on a recreated topic. Once a topic is gone, all its offsets (persistent or otherwise) need to go with it. Ideally the consumer would get a notification in the rebalance listener that the topic was recreated so that the user has a chance to set the initial position. As a starting place, we can associate the in-memory fetch position in the consumer with the topicId that is being fetched. Then if we find the topic ID is deleted, we should delete the position in memory and consult the reset policy. > Consider how Topic IDs can improve consume experience > - > > Key: KAFKA-12975 > URL: https://issues.apache.org/jira/browse/KAFKA-12975 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Priority: Major > > Currently when a consumer subscribes to a topic, it will continue to consume > from this topic across topic deletions and recreations with the same name. By > adding topic IDs to the consumer, we will have more insight for these events. > We should figure out if we want to change consumer logic now that we have > this information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh
IgnacioAcunaF commented on pull request #10858: URL: https://github.com/apache/kafka/pull/10858#issuecomment-870107728 Thanks @dajac for review and comments! Updated the PR taking them into account. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12975) Consider how Topic IDs can improve consume experience
[ https://issues.apache.org/jira/browse/KAFKA-12975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12975: Parent: KAFKA-8872 Issue Type: Sub-task (was: Improvement) > Consider how Topic IDs can improve consume experience > - > > Key: KAFKA-12975 > URL: https://issues.apache.org/jira/browse/KAFKA-12975 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Priority: Major > > Currently when a consumer subscribes to a topic, it will continue to consume > from this topic across topic deletions and recreations with the same name. By > adding topic IDs to the consumer, we will have more insight for these events. > We should figure out if we want to change consumer logic now that we have > this information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.
IgnacioAcunaF commented on a change in pull request #10858: URL: https://github.com/apache/kafka/pull/10858#discussion_r660173021 ## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ## @@ -62,6 +63,88 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { +val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") +val groupService = consumerGroupService(args) + +val testTopicPartition0 = new TopicPartition("testTopic1", 0); +val testTopicPartition1 = new TopicPartition("testTopic1", 1); +val testTopicPartition2 = new TopicPartition("testTopic1", 2); +val testTopicPartition3 = new TopicPartition("testTopic2", 0); +val testTopicPartition4 = new TopicPartition("testTopic2", 1); +val testTopicPartition5 = new TopicPartition("testTopic2", 2); + +// Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined +val commitedOffsets = Map( + testTopicPartition1 -> new OffsetAndMetadata(100), + testTopicPartition2 -> null, + testTopicPartition3 -> new OffsetAndMetadata(100), + testTopicPartition4 -> new OffsetAndMetadata(100), + testTopicPartition5 -> null, +).asJava + +val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) +val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), +) +val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2) +val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5) + +val consumerGroupDescription = new ConsumerGroupDescription(group, + true, + Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))), + classOf[RangeAssignor].getName, + ConsumerGroupState.STABLE, + new Node(1, "localhost", 9092)) + +def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = { + topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions) +} + + when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription +when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())) + .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets)) +when(admin.listOffsets( + ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)), + any() +)).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.asJava)) Review comment: Sure, will add -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.
IgnacioAcunaF commented on a change in pull request #10858: URL: https://github.com/apache/kafka/pull/10858#discussion_r660172970 ## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ## @@ -62,6 +63,88 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { +val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") +val groupService = consumerGroupService(args) + +val testTopicPartition0 = new TopicPartition("testTopic1", 0); +val testTopicPartition1 = new TopicPartition("testTopic1", 1); +val testTopicPartition2 = new TopicPartition("testTopic1", 2); +val testTopicPartition3 = new TopicPartition("testTopic2", 0); +val testTopicPartition4 = new TopicPartition("testTopic2", 1); +val testTopicPartition5 = new TopicPartition("testTopic2", 2); + +// Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined +val commitedOffsets = Map( + testTopicPartition1 -> new OffsetAndMetadata(100), + testTopicPartition2 -> null, + testTopicPartition3 -> new OffsetAndMetadata(100), + testTopicPartition4 -> new OffsetAndMetadata(100), + testTopicPartition5 -> null, +).asJava + +val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) +val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), +) +val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2) +val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5) + +val consumerGroupDescription = new ConsumerGroupDescription(group, + true, + Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))), + classOf[RangeAssignor].getName, + ConsumerGroupState.STABLE, + new Node(1, "localhost", 9092)) + +def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = { + topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions) +} + + when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription +when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())) + .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets)) +when(admin.listOffsets( + ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)), + any() +)).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.asJava)) +when(admin.listOffsets( + ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)), + any() +)).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.asJava)) Review comment: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.
IgnacioAcunaF commented on a change in pull request #10858: URL: https://github.com/apache/kafka/pull/10858#discussion_r660172772 ## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ## @@ -62,6 +63,88 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { +val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") +val groupService = consumerGroupService(args) + +val testTopicPartition0 = new TopicPartition("testTopic1", 0); +val testTopicPartition1 = new TopicPartition("testTopic1", 1); +val testTopicPartition2 = new TopicPartition("testTopic1", 2); +val testTopicPartition3 = new TopicPartition("testTopic2", 0); +val testTopicPartition4 = new TopicPartition("testTopic2", 1); +val testTopicPartition5 = new TopicPartition("testTopic2", 2); + +// Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined +val commitedOffsets = Map( + testTopicPartition1 -> new OffsetAndMetadata(100), + testTopicPartition2 -> null, + testTopicPartition3 -> new OffsetAndMetadata(100), + testTopicPartition4 -> new OffsetAndMetadata(100), + testTopicPartition5 -> null, +).asJava + +val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) +val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), +) +val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2) +val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5) + +val consumerGroupDescription = new ConsumerGroupDescription(group, + true, + Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))), + classOf[RangeAssignor].getName, + ConsumerGroupState.STABLE, + new Node(1, "localhost", 9092)) + +def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = { + topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions) +} + + when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription +when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())) + .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets)) +when(admin.listOffsets( + ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)), + any() +)).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.asJava)) +when(admin.listOffsets( + ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)), + any() +)).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.asJava)) + +val (state, assignments) = groupService.collectGroupOffsets(group) +val returnedOffsets = assignments.map { results => + results.map { assignment => +new TopicPartition(assignment.topic.get, assignment.partition.get) -> assignment.offset + }.toMap +}.getOrElse(Map.empty) +// Results should have information for all assigned topic partition (even if there is not Offset's information at all, because they get fills with None) +// Results should have information only for unassigned topic partitions if and only if there is information about them (including with null values) Review comment: You are right, it's not relevant because all partitions have information now. Going to remove -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12975) Consider how Topic IDs can improve consume experience
[ https://issues.apache.org/jira/browse/KAFKA-12975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reassigned KAFKA-12975: -- Assignee: (was: Justine Olshan) > Consider how Topic IDs can improve consume experience > - > > Key: KAFKA-12975 > URL: https://issues.apache.org/jira/browse/KAFKA-12975 > Project: Kafka > Issue Type: Improvement >Reporter: Justine Olshan >Priority: Major > > Currently when a consumer subscribes to a topic, it will continue to consume > from this topic across topic deletions and recreations with the same name. By > adding topic IDs to the consumer, we will have more insight for these events. > We should figure out if we want to change consumer logic now that we have > this information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
junrao commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r660164974 ## File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ## @@ -1472,11 +1489,30 @@ class LogCleanerTest { time.scheduler.clear() cleanedKeys = LogTestUtils.keysInLog(log) -// 3) Simulate recovery after swap file is created and old segments files are renamed +// 4) Simulate recovery after swap file is created and old segments files are renamed //to .deleted. Clean operation is resumed during recovery. log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix) log = recoverAndCheck(config, cleanedKeys) +// add some more messages and clean the log again +while (log.numberOfSegments < 10) { + log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0) + messageCount += 1 +} +for (k <- 1 until messageCount by 2) + offsetMap.put(key(k), Long.MaxValue) +cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), + new CleanedTransactionMetadata) +// clear scheduler so that async deletes don't run +time.scheduler.clear() +cleanedKeys = LogTestUtils.keysInLog(log) + +// 5) Simulate recovery after a subset of swap files are renamed to regular files and old segments files are renamed +//to .deleted. Clean operation is resumed during recovery. +log.logSegments.head.timeIndex.file.renameTo(new File(CoreUtils.replaceSuffix(log.logSegments.head.timeIndex.file.getPath, "", Log.SwapFileSuffix))) + // .changeFileSuffixes("", Log.SwapFileSuffix) Review comment: Is this still needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r660165327 ## File path: clients/src/main/java/org/apache/kafka/common/TopicCollection.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import java.util.Collection; + +/** + * A class used to represent a collection of topics. This collection may define topics by topic name + * or topic ID. Subclassing this class beyond the classes provided here is not supported. + */ +public abstract class TopicCollection { Review comment: Static factories in addition to a public constructor? Or make the constructor private? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r660164967 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1688,15 +1691,27 @@ void handleFailure(Throwable throwable) { } @Override -public DeleteTopicsResult deleteTopics(final Collection topicNames, +public DeleteTopicsResult deleteTopics(final TopicCollection topics, final DeleteTopicsOptions options) { +DeleteTopicsResult result; +if (topics instanceof TopicIdCollection) +result = DeleteTopicsResult.ofTopicIds(new HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), options))); +else if (topics instanceof TopicNameCollection) +result = DeleteTopicsResult.ofTopicNames(new HashMap<>(handleDeleteTopicsUsingNames(((TopicNameCollection) topics).topicNames(), options))); +else +throw new UnsupportedOperationException("The TopicCollection provided did not match any supported classes for deleteTopics."); Review comment: I was trying to remember the name of that error. I agree. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r660164864 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java ## @@ -30,24 +31,62 @@ */ @InterfaceStability.Evolving public class DeleteTopicsResult { -final Map> futures; +private Map> nameFutures; +private Map> topicIdFutures; -protected DeleteTopicsResult(Map> futures) { -this.futures = futures; +protected DeleteTopicsResult() {} + +private void setNameFutures(Map> nameFutures) { +this.nameFutures = nameFutures; +} + +private void setTopicIdFutures(Map> topicIdFutures) { +this.topicIdFutures = topicIdFutures; +} + +protected static DeleteTopicsResult ofTopicNames(Map> nameFutures) { +DeleteTopicsResult result = new DeleteTopicsResult(); +result.setNameFutures(nameFutures); +return result; +} + +protected static DeleteTopicsResult ofTopicIds(Map> topicIdFutures) { +DeleteTopicsResult result = new DeleteTopicsResult(); +result.setTopicIdFutures(topicIdFutures); +return result; +} + +/** + * Return a map from topic names to futures which can be used to check the status of + * individual deletions if the deleteTopics request used topic names. Otherwise return null. Review comment: I was following the convention from this class before, but I can update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r660164712 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java ## @@ -30,24 +31,62 @@ */ @InterfaceStability.Evolving public class DeleteTopicsResult { -final Map> futures; +private Map> nameFutures; +private Map> topicIdFutures; -protected DeleteTopicsResult(Map> futures) { -this.futures = futures; +protected DeleteTopicsResult() {} Review comment: So the idea is that we have two parameters and set one as null in the static methods? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics
hachikuji commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r660160050 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java ## @@ -30,24 +31,62 @@ */ @InterfaceStability.Evolving public class DeleteTopicsResult { -final Map> futures; +private Map> nameFutures; +private Map> topicIdFutures; -protected DeleteTopicsResult(Map> futures) { -this.futures = futures; +protected DeleteTopicsResult() {} + +private void setNameFutures(Map> nameFutures) { +this.nameFutures = nameFutures; +} + +private void setTopicIdFutures(Map> topicIdFutures) { +this.topicIdFutures = topicIdFutures; +} + +protected static DeleteTopicsResult ofTopicNames(Map> nameFutures) { +DeleteTopicsResult result = new DeleteTopicsResult(); +result.setNameFutures(nameFutures); +return result; +} + +protected static DeleteTopicsResult ofTopicIds(Map> topicIdFutures) { +DeleteTopicsResult result = new DeleteTopicsResult(); +result.setTopicIdFutures(topicIdFutures); +return result; +} + +/** + * Return a map from topic names to futures which can be used to check the status of + * individual deletions if the deleteTopics request used topic names. Otherwise return null. + */ +public Map> topicNameValues() { +return nameFutures; +} + Review comment: nit: extra newline ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java ## @@ -30,24 +31,62 @@ */ @InterfaceStability.Evolving public class DeleteTopicsResult { -final Map> futures; +private Map> nameFutures; +private Map> topicIdFutures; -protected DeleteTopicsResult(Map> futures) { -this.futures = futures; +protected DeleteTopicsResult() {} Review comment: Could this be private? I think it would be a little cleaner to take `nameFutures` and `topicIdFutures` in the constructor. That would allow us to mark those fields as final. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java ## @@ -30,24 +31,62 @@ */ @InterfaceStability.Evolving public class DeleteTopicsResult { -final Map> futures; +private Map> nameFutures; +private Map> topicIdFutures; -protected DeleteTopicsResult(Map> futures) { -this.futures = futures; +protected DeleteTopicsResult() {} + +private void setNameFutures(Map> nameFutures) { +this.nameFutures = nameFutures; +} + +private void setTopicIdFutures(Map> topicIdFutures) { +this.topicIdFutures = topicIdFutures; +} + +protected static DeleteTopicsResult ofTopicNames(Map> nameFutures) { +DeleteTopicsResult result = new DeleteTopicsResult(); +result.setNameFutures(nameFutures); +return result; +} + +protected static DeleteTopicsResult ofTopicIds(Map> topicIdFutures) { +DeleteTopicsResult result = new DeleteTopicsResult(); +result.setTopicIdFutures(topicIdFutures); +return result; +} + +/** + * Return a map from topic names to futures which can be used to check the status of + * individual deletions if the deleteTopics request used topic names. Otherwise return null. Review comment: nit: can we use javadoc `@return`? ## File path: clients/src/main/java/org/apache/kafka/common/TopicCollection.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import java.util.Collection; + +/** + * A class used to represent a collection of topics. This collection may define topics by topic name + * or topic ID. Subclassing this class beyond the classes provided here is not supported. + */ +public abstract class TopicCollection { + +private TopicCollection() {} + +/** + * A class used to represent a collection of topics defined by their topic ID. + * Subclassing this class beyond the classes provided here is not supported. + */ +public static class TopicIdCollection
[GitHub] [kafka] mjsax commented on pull request #10824: KAFKA-12718: SessionWindows are closed too early
mjsax commented on pull request #10824: URL: https://github.com/apache/kafka/pull/10824#issuecomment-870094158 Thanks for your PR! Merged to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #10824: KAFKA-12718: SessionWindows are closed too early
mjsax merged pull request #10824: URL: https://github.com/apache/kafka/pull/10824 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #10893: KAFKA-12909: add missing tests
mjsax merged pull request #10893: URL: https://github.com/apache/kafka/pull/10893 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10894: KAFKA-12951: restore must terminate for tx global topic
mjsax commented on pull request #10894: URL: https://github.com/apache/kafka/pull/10894#issuecomment-870089809 Merged to `trunk` and cherry-picked to `2.8` and `2.7` branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10930: KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs
guozhangwang commented on pull request #10930: URL: https://github.com/apache/kafka/pull/10930#issuecomment-870078185 > @guozhangwang With Errors.NONE, we throw OffsetOutOfRangeException in the follower when attempting to update follower's start offset based on the leader's start offset returned in the response: I see, I thought you meant there are some conditions on the follower's side that still can protect us from not capturing this error. Now that I realized this condition may or may not hit really, but in either case it's bad: 1. If it is not hit, we would ended up not capturing this error and proceed as if nothing went wrong. 2. If it is hit, we throw OOO to capture, on follower's side, but also moved the partition to failed state and we would not be able to recover from that state. If my understanding here is correct, I think I can go ahead and merge the PR. BTW could you re-trigger the unit tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13005) Support JBOD in kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-13005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-13005: - Labels: kip-500 (was: ) > Support JBOD in kraft mode > -- > > Key: KAFKA-13005 > URL: https://issues.apache.org/jira/browse/KAFKA-13005 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > Labels: kip-500 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked t…
jsancio commented on a change in pull request #10932: URL: https://github.com/apache/kafka/pull/10932#discussion_r660142781 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -157,13 +162,27 @@ public synchronized void handleSnapshot(SnapshotReader reader) { public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) { if (newLeader.isLeader(nodeId)) { log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}", -committed, newLeader); +committed, newLeader); uncommitted = committed; claimedEpoch = OptionalInt.of(newLeader.epoch()); } else { log.debug("Counter uncommitted value reset after resigning leadership"); uncommitted = -1; claimedEpoch = OptionalInt.empty(); } +handleSnapshotCalled = false; +handleSnapshotCalls = 0; +} + +public boolean isLeader() { +return this.client.leaderAndEpoch().isLeader(nodeId); Review comment: https://issues.apache.org/jira/browse/KAFKA-13006 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13006) Remove the method RaftClient.leaderAndEpoch
Jose Armando Garcia Sancio created KAFKA-13006: -- Summary: Remove the method RaftClient.leaderAndEpoch Key: KAFKA-13006 URL: https://issues.apache.org/jira/browse/KAFKA-13006 Project: Kafka Issue Type: Sub-task Reporter: Jose Armando Garcia Sancio The are semantic differences between {{RaftClient.leaderAndEpoch}} and {{RaftClient.Listener.handleLeaderChange}} specially when the raft client transition from follower to leader. To simplify the API, I think that we should remove the method {{RaftClient.leaderAndEpoch}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)
cmccabe commented on a change in pull request #10931: URL: https://github.com/apache/kafka/pull/10931#discussion_r660141890 ## File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java ## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.RemoveTopicRecord; +import org.apache.kafka.common.metadata.TopicRecord; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + + +/** + * Represents changes to the topics in the metadata image. + */ +public final class TopicsDelta { +private final TopicsImage image; + +/** + * A map from topic IDs to the topic deltas for each topic. Topics which have been + * deleted will not appear in this map. + */ +private final Map changedTopics = new HashMap<>(); + +/** + * The IDs of topics that exist in the image but that have been deleted. Note that if + * a topic does not exist in the image, it will also not exist in this set. Topics + * that are created and then deleted within the same delta will leave no trace. + */ +private final Set deletedTopicIds = new HashSet<>(); Review comment: Sorry, I can't quite follow the question. It is certainly the case that we only replay each record once and exactly once, though, if that's the question? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked t…
jsancio commented on a change in pull request #10932: URL: https://github.com/apache/kafka/pull/10932#discussion_r660140438 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -38,6 +38,9 @@ private OptionalInt claimedEpoch = OptionalInt.empty(); private long lastOffsetSnapshotted = -1; +private int handleSnapshotCalls = 0; +private boolean handleSnapshotCalled = false; Review comment: We should only store `handleSnapshotCalls` since `handleSnapshotCalled` is always equal to `handleSnapshotCalls > 0`. ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -157,13 +162,27 @@ public synchronized void handleSnapshot(SnapshotReader reader) { public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) { if (newLeader.isLeader(nodeId)) { log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}", -committed, newLeader); +committed, newLeader); uncommitted = committed; claimedEpoch = OptionalInt.of(newLeader.epoch()); } else { log.debug("Counter uncommitted value reset after resigning leadership"); uncommitted = -1; claimedEpoch = OptionalInt.empty(); } +handleSnapshotCalled = false; +handleSnapshotCalls = 0; +} + +public boolean isLeader() { +return this.client.leaderAndEpoch().isLeader(nodeId); Review comment: I think this is the issue you reported in the Jira. The `RaftClient.Listener` should not use `RaftClient.leaderAndEpoch` to determine if it is the leader. It should instead use `RaftClient.Listener.handleLeaderChange`. For this state machine `ReplicatedCounter` we should look at the `claimedEpoch` variable. I am going to create an issue to remove this method. cc @hachikuji -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13005) Support JBOD in kraft mode
Colin McCabe created KAFKA-13005: Summary: Support JBOD in kraft mode Key: KAFKA-13005 URL: https://issues.apache.org/jira/browse/KAFKA-13005 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] niket-goel commented on a change in pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot
niket-goel commented on a change in pull request #10899: URL: https://github.com/apache/kafka/pull/10899#discussion_r660135431 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -204,35 +207,103 @@ private void completeCurrentBatch() { currentBatch = null; } -public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long currentTimeMs) { +private void appendControlMessage( +Supplier supplier, +ByteBuffer buffer +) { appendLock.lock(); try { forceDrain(); -ByteBuffer buffer = memoryPool.tryAllocate(256); -if (buffer != null) { -MemoryRecords data = MemoryRecords.withLeaderChangeMessage( -this.nextOffset, -currentTimeMs, -this.epoch, -buffer, -leaderChangeMessage -); -completed.add(new CompletedBatch<>( -nextOffset, -1, -data, -memoryPool, -buffer -)); -nextOffset += 1; -} else { -throw new IllegalStateException("Could not allocate buffer for the leader change record."); -} +completed.add(new CompletedBatch<>( +nextOffset, +1, +supplier.get(), +memoryPool, +buffer +)); +nextOffset += 1; } finally { appendLock.unlock(); } } +/** + * Append a {@link LeaderChangeMessage} record to the batch + * + * @param @LeaderChangeMessage The message to append + * @param @currentTimeMs The timestamp of message generation + * @throws IllegalStateException on failure to allocate a buffer for the record + */ +public void appendLeaderChangeMessage( +LeaderChangeMessage leaderChangeMessage, +long currentTimeMs +) { +ByteBuffer buffer = memoryPool.tryAllocate(256); Review comment: The code organization gets funny if we do that because the byte buffer is needed for the MemoryRecord API as well. Will sync with you offline on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niket-goel commented on a change in pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot
niket-goel commented on a change in pull request #10899: URL: https://github.com/apache/kafka/pull/10899#discussion_r660134385 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -204,35 +207,103 @@ private void completeCurrentBatch() { currentBatch = null; } -public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long currentTimeMs) { +private void appendControlMessage( +Supplier supplier, +ByteBuffer buffer +) { appendLock.lock(); try { forceDrain(); -ByteBuffer buffer = memoryPool.tryAllocate(256); -if (buffer != null) { -MemoryRecords data = MemoryRecords.withLeaderChangeMessage( -this.nextOffset, -currentTimeMs, -this.epoch, -buffer, -leaderChangeMessage -); -completed.add(new CompletedBatch<>( -nextOffset, -1, -data, -memoryPool, -buffer -)); -nextOffset += 1; -} else { -throw new IllegalStateException("Could not allocate buffer for the leader change record."); -} +completed.add(new CompletedBatch<>( +nextOffset, +1, +supplier.get(), +memoryPool, +buffer +)); +nextOffset += 1; } finally { appendLock.unlock(); } } +/** + * Append a {@link LeaderChangeMessage} record to the batch + * + * @param @LeaderChangeMessage The message to append + * @param @currentTimeMs The timestamp of message generation + * @throws IllegalStateException on failure to allocate a buffer for the record + */ +public void appendLeaderChangeMessage( +LeaderChangeMessage leaderChangeMessage, +long currentTimeMs +) { +ByteBuffer buffer = memoryPool.tryAllocate(256); +if (buffer != null) { +appendControlMessage( +() -> MemoryRecords.withLeaderChangeMessage( +this.nextOffset, +currentTimeMs, +this.epoch, +buffer, +leaderChangeMessage), +buffer); +} else { +throw new IllegalStateException("Could not allocate buffer for the leader change record."); +} +} + + +/** + * Append a {@link SnapshotHeaderRecord} record to the batch + * + * @param @SnapshotHeaderRecord The message to append + * @throws IllegalStateException on failure to allocate a buffer for the record + */ +public void appendSnapshotHeaderMessage( +SnapshotHeaderRecord snapshotHeaderRecord, +long currentTimeMs +) { +ByteBuffer buffer = memoryPool.tryAllocate(256); +if (buffer != null) { +appendControlMessage( +() -> MemoryRecords.withSnapshotHeaderRecord( +this.nextOffset, +currentTimeMs, +this.epoch, +buffer, +snapshotHeaderRecord), +buffer); +} else { +throw new IllegalStateException("Could not allocate buffer for the metadata snapshot header record."); +} +} + +/** + * Append a {@link SnapshotFooterRecord} record to the batch + * + * @param @SnapshotFooterRecord The message to append + * @throws IllegalStateException on failure to allocate a buffer for the record + */ +public void appendSnapshotFooterMessage( +SnapshotFooterRecord snapshotFooterRecord Review comment: Will do ## File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java ## @@ -151,6 +194,57 @@ public void testAppendToFrozenSnapshot() throws Exception { ); } +private int validateDelimiters( +RawSnapshotReader snapshot, +long lastContainedLogTime +) { +assertNotEquals(0, snapshot.sizeInBytes()); + +int countRecords = 0; + +Iterator recordBatches = Utils.covariantCast(snapshot.records().batchIterator()); + +assertEquals(Boolean.TRUE, recordBatches.hasNext()); Review comment: thanks ## File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java ## @@ -22,25 +22,65 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.Random; +import java.util.Iterator; import java.util.Set; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.message.SnapshotHeaderRecord; import
[GitHub] [kafka] niket-goel commented on a change in pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot
niket-goel commented on a change in pull request #10899: URL: https://github.com/apache/kafka/pull/10899#discussion_r660133212 ## File path: clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java ## @@ -26,19 +28,49 @@ */ public class ControlRecordUtils { -public static final short LEADER_CHANGE_SCHEMA_VERSION = new LeaderChangeMessage().highestSupportedVersion(); +public static final short LEADER_CHANGE_SCHEMA_HIGHEST_VERSION = new LeaderChangeMessage().highestSupportedVersion(); +public static final short SNAPSHOT_HEADER_HIGHEST_VERSION = new SnapshotHeaderRecord().highestSupportedVersion(); +public static final short SNAPSHOT_FOOTER_HIGHEST_VERSION = new SnapshotFooterRecord().highestSupportedVersion(); public static LeaderChangeMessage deserializeLeaderChangeMessage(Record record) { ControlRecordType recordType = ControlRecordType.parse(record.key()); if (recordType != ControlRecordType.LEADER_CHANGE) { throw new IllegalArgumentException( -"Expected LEADER_CHANGE control record type(3), but found " + recordType.toString()); +"Expected LEADER_CHANGE control record type(2), but found " + recordType.toString()); } return deserializeLeaderChangeMessage(record.value().duplicate()); } public static LeaderChangeMessage deserializeLeaderChangeMessage(ByteBuffer data) { Review comment: Will cut a JIRA. ## File path: clients/src/main/resources/common/message/SnapshotHeaderRecord.json ## @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "SnapshotHeaderRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{"name": "Version", "type": "int16", "versions": "0+", + "about": "The version of the snapshot header record"}, +{"name": "LastContainedLogTime", "type": "int64", "versions": "0+", + "about": "The append time of the highest record contained in this snapshot"} Review comment: That sounds better. Will use this. ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -345,7 +345,8 @@ void createSnapshotGenerator(long committedOffset, int committedEpoch) { } Optional> writer = raftClient.createSnapshot( committedOffset, -committedEpoch +committedEpoch, +0/*KAFKA-12997*/ Review comment: ack. ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -106,7 +106,7 @@ public synchronized void handleCommit(BatchReader reader) { lastCommittedEpoch, lastOffsetSnapshotted ); -Optional> snapshot = client.createSnapshot(lastCommittedOffset, lastCommittedEpoch); +Optional> snapshot = client.createSnapshot(lastCommittedOffset, lastCommittedEpoch, 0/*KAFKA-12997*/); Review comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r660127887 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -193,18 +197,22 @@ class CachedPartition(val topic: String, * Each fetch session is protected by its own lock, which must be taken before mutable * fields are read or modified. This includes modification of the session partition map. * - * @param id The unique fetch session ID. - * @param privileged True if this session is privileged. Sessions crated by followers - * are privileged; sesssion created by consumers are not. - * @param partitionMap The CachedPartitionMap. - * @param creationMs The time in milliseconds when this session was created. - * @param lastUsedMs The last used time in milliseconds. This should only be updated by - * FetchSessionCache#touch. - * @param epochThe fetch session sequence number. + * @param id The unique fetch session ID. + * @param privileged True if this session is privileged. Sessions crated by followers + * are privileged; session created by consumers are not. + * @param partitionMap The CachedPartitionMap. + * @param usesTopicIds True if this session is using topic IDs + * @param sessionTopicIdsThe mapping from topic name to topic ID for topics in the session. + * @param creationMs The time in milliseconds when this session was created. + * @param lastUsedMs The last used time in milliseconds. This should only be updated by + * FetchSessionCache#touch. + * @param epoch The fetch session sequence number. */ class FetchSession(val id: Int, val privileged: Boolean, val partitionMap: FetchSession.CACHE_MAP, + val usesTopicIds: Boolean, + val sessionTopicIds: FetchSession.TOPIC_ID_MAP, Review comment: Taking a second look, seems like we just use partitionMap.size. Not sure if it is useful to have sessionTopicIds size (and if the whole map is too much). I'm thinking maybe just including the usesTopicIds boolean. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13004) Trogdor performance decreases sharply with large amounts of tasks.
Scott Hendricks created KAFKA-13004: --- Summary: Trogdor performance decreases sharply with large amounts of tasks. Key: KAFKA-13004 URL: https://issues.apache.org/jira/browse/KAFKA-13004 Project: Kafka Issue Type: Bug Components: tools Environment: We run our Trogdor clusters within Kubernetes. Reporter: Scott Hendricks Assignee: Scott Hendricks As part of my performance tests, I am running 3000 workloads within Trogdor. The clients seem to be able to handle this fine, but when I go to reset and run the same test again, Trogdor seems sluggish. Here are the steps to reproduce this: # Run 3000 workloads in Trogdor, a combination of Produce/Consume workloads. # Wait for the workloads to complete. # Run the DELETE API calls to destroy all 3000 workloads to reset for the next run. # Confirm via the API that there are no workloads defined in the system. # Run an additional 3000 workloads in Trogdor similar to step 1. The Coordinator takes a long time to start the second batch of 3000. There seems to be some performance issue in the framework that will take a while to debug. At this point I don't know if it only affects the Coordinator, or if the Agents are affected as well. I do not currently have the time to look into this, so I am creating this issue to track it. The workaround I am employing is destroying and recreating the Trogdor cluster in between test runs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r660122565 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque * @param time The clock to use. * @param cache The fetch session cache. * @param reqMetadataThe request metadata. + * @param versionThe version of the request, * @param fetchData The partition data from the fetch request. + * @param topicIds The map from topic names to topic IDs. * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, private val cache: FetchSessionCache, private val reqMetadata: JFetchMetadata, + private val version: Short, private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData], + private val topicIds: util.Map[String, Uuid], private val isFromFollower: Boolean) extends FetchContext { override def getFetchOffset(part: TopicPartition): Option[Long] = Option(fetchData.get(part)).map(_.fetchOffset) - override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = { -fetchData.forEach(fun(_, _)) + override def foreachPartition(fun: (TopicPartition, Uuid, FetchRequest.PartitionData) => Unit): Unit = { +fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data)) } override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = { -FetchResponse.sizeOf(versionId, updates.entrySet.iterator) +FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds) } override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = { -def createNewSession: FetchSession.CACHE_MAP = { +var error = Errors.NONE +def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) = { val cachedPartitions = new FetchSession.CACHE_MAP(updates.size) + val sessionTopicIds = new util.HashMap[String, Uuid](updates.size) updates.forEach { (part, respData) => +if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) + error = Errors.INCONSISTENT_TOPIC_ID val reqData = fetchData.get(part) -cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData)) +val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID) +cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, respData)) +if (id != Uuid.ZERO_UUID) + sessionTopicIds.put(part.topic, id) } - cachedPartitions + (cachedPartitions, sessionTopicIds) } val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower, -updates.size, () => createNewSession) +updates.size, version, () => createNewSession) debug(s"Full fetch context with session id $responseSessionId returning " + s"${partitionsToLogString(updates.keySet)}") -FetchResponse.of(Errors.NONE, 0, responseSessionId, updates) +FetchResponse.of(error, 0, responseSessionId, updates, topicIds) Review comment: I guess the only issue with using FetchRequest.getErrorResponse is that we may have different topics in the response than in the request. SessionErrorContext deals with this by simply having an empty response besides the top level error. I'm wondering if we should do something like this. (Likewise, with the UNKNOWN_TOPIC_ID error, should we also just send back an empty response?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r660122565 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque * @param time The clock to use. * @param cache The fetch session cache. * @param reqMetadataThe request metadata. + * @param versionThe version of the request, * @param fetchData The partition data from the fetch request. + * @param topicIds The map from topic names to topic IDs. * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, private val cache: FetchSessionCache, private val reqMetadata: JFetchMetadata, + private val version: Short, private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData], + private val topicIds: util.Map[String, Uuid], private val isFromFollower: Boolean) extends FetchContext { override def getFetchOffset(part: TopicPartition): Option[Long] = Option(fetchData.get(part)).map(_.fetchOffset) - override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = { -fetchData.forEach(fun(_, _)) + override def foreachPartition(fun: (TopicPartition, Uuid, FetchRequest.PartitionData) => Unit): Unit = { +fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data)) } override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = { -FetchResponse.sizeOf(versionId, updates.entrySet.iterator) +FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds) } override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = { -def createNewSession: FetchSession.CACHE_MAP = { +var error = Errors.NONE +def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) = { val cachedPartitions = new FetchSession.CACHE_MAP(updates.size) + val sessionTopicIds = new util.HashMap[String, Uuid](updates.size) updates.forEach { (part, respData) => +if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) + error = Errors.INCONSISTENT_TOPIC_ID val reqData = fetchData.get(part) -cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData)) +val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID) +cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, respData)) +if (id != Uuid.ZERO_UUID) + sessionTopicIds.put(part.topic, id) } - cachedPartitions + (cachedPartitions, sessionTopicIds) } val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower, -updates.size, () => createNewSession) +updates.size, version, () => createNewSession) debug(s"Full fetch context with session id $responseSessionId returning " + s"${partitionsToLogString(updates.keySet)}") -FetchResponse.of(Errors.NONE, 0, responseSessionId, updates) +FetchResponse.of(error, 0, responseSessionId, updates, topicIds) Review comment: I guess the only issue with using FetchRequest.getErrorResponse is that we may have different topics in the response than in the request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #10894: KAFKA-12951: restore must terminate for tx global topic
mjsax merged pull request #10894: URL: https://github.com/apache/kafka/pull/10894 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
ccding commented on pull request #10763: URL: https://github.com/apache/kafka/pull/10763#issuecomment-870025479 @junrao Thanks for the review. I have addressed the comments. Please take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
ccding commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r660101025 ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -90,11 +90,58 @@ object LogLoader extends Logging { * overflow index offset */ def load(params: LoadLogParams): LoadedLogOffsets = { -// first do a pass through the files in the log directory and remove any temporary files + +// First pass: through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles(params) -// Now do a second pass and load all the log and index files. +// The remaining valid swap files must come from compaction or segment split operation. We can +// simply rename them to regular segment files. But, before renaming, we should figure out which +// segments are compacted and delete these segment files: this is done by calculating min/maxSwapFileOffset. Review comment: fixed ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -90,11 +90,58 @@ object LogLoader extends Logging { * overflow index offset */ def load(params: LoadLogParams): LoadedLogOffsets = { -// first do a pass through the files in the log directory and remove any temporary files + +// First pass: through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles(params) -// Now do a second pass and load all the log and index files. +// The remaining valid swap files must come from compaction or segment split operation. We can +// simply rename them to regular segment files. But, before renaming, we should figure out which +// segments are compacted and delete these segment files: this is done by calculating min/maxSwapFileOffset. +// We store segments that require renaming in this code block, and do the actual renaming later. +var minSwapFileOffset = Long.MaxValue +var maxSwapFileOffset = Long.MinValue +swapFiles.filter(f => Log.isLogFile(new File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "".foreach { f => + val baseOffset = offsetFromFile(f) + val segment = LogSegment.open(f.getParentFile, +baseOffset = baseOffset, +params.config, +time = params.time, +fileSuffix = Log.SwapFileSuffix) + info(s"${params.logIdentifier}Found log file ${f.getPath} from interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix} files by renaming.") + minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset) + maxSwapFileOffset = Math.max(segment.offsetIndex.lastOffset, maxSwapFileOffset) +} + +// Second pass: delete segments that are between minSwapFileOffset and maxSwapFileOffset. As +// discussed above, these segments were compacted or split but haven't been renamed to .delete +// before shutting down the broker. +for (file <- params.dir.listFiles if file.isFile) { + try { +if (!file.getName.endsWith(SwapFileSuffix)) { + val offset = offsetFromFile(file) + if (offset >= minSwapFileOffset && offset <= maxSwapFileOffset) { Review comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stan-confluent commented on a change in pull request #10915: Enable connecting VS Code remote debugger
stan-confluent commented on a change in pull request #10915: URL: https://github.com/apache/kafka/pull/10915#discussion_r660083163 ## File path: tests/README.md ## @@ -51,6 +51,40 @@ bash tests/docker/ducker-ak up -j 'openjdk:11'; tests/docker/run_tests.sh ``` REBUILD="t" bash tests/docker/run_tests.sh ``` +* Debug tests in VS Code: + - Run test with `--debug` flag (can be before or after file name): Review comment: @omkreddy I've updated the run_tests.sh file to pass ducktape args after -- if those are present, can you please take another look? I've also ran shellcheck and fixed the violations found in the changed bits of code. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r660068452 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -471,16 +504,19 @@ class IncrementalFetchContext(private val time: Time, if (session.epoch != expectedEpoch) { info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but " + s"got ${session.epoch}. Possible duplicate request.") -FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP) +FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP, Collections.emptyMap()) } else { +var error = Errors.NONE // Iterate over the update list using PartitionIterator. This will prune updates which don't need to be sent +// It will also set the top-level error to INCONSISTENT_TOPIC_ID if any partitions had this error. val partitionIter = new PartitionIterator(updates.entrySet.iterator, true) while (partitionIter.hasNext) { - partitionIter.next() + if (partitionIter.next().getValue.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) +error = Errors.INCONSISTENT_TOPIC_ID Review comment: I'm still not sure I follow "pending fetch request could still reference the outdated Partition object and therefore miss the topicId change" My understanding is that the log is the source of truth and we will either read from the log if it matches and not read if it doesn't. I see we could get an error erroneously if the partition didn't update in time, but I don't see us being able to read from the log due to a stale partition. Or are you referring to the getPartitionOrException(tp) call picking up a stale partition and both the request and the partition are stale? In this case, we will read from the log, but will identify it with its correct ID. The client will handle based on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ueisele commented on a change in pull request #10935: KAFKA-13003: In kraft mode also advertise configured advertised port instead of socket port
ueisele commented on a change in pull request #10935: URL: https://github.com/apache/kafka/pull/10935#discussion_r660063967 ## File path: core/src/main/scala/kafka/server/BrokerServer.scala ## @@ -283,7 +283,7 @@ class BrokerServer( networkListeners.add(new Listener(). setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host). setName(ep.listenerName.value()). - setPort(socketServer.boundPort(ep.listenerName)). + setPort(ep.port). Review comment: Yes, sure. I will add a test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r660062652 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque * @param time The clock to use. * @param cache The fetch session cache. * @param reqMetadataThe request metadata. + * @param versionThe version of the request, * @param fetchData The partition data from the fetch request. + * @param topicIds The map from topic names to topic IDs. * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, private val cache: FetchSessionCache, private val reqMetadata: JFetchMetadata, + private val version: Short, private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData], + private val topicIds: util.Map[String, Uuid], private val isFromFollower: Boolean) extends FetchContext { override def getFetchOffset(part: TopicPartition): Option[Long] = Option(fetchData.get(part)).map(_.fetchOffset) - override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = { -fetchData.forEach(fun(_, _)) + override def foreachPartition(fun: (TopicPartition, Uuid, FetchRequest.PartitionData) => Unit): Unit = { +fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data)) } override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = { -FetchResponse.sizeOf(versionId, updates.entrySet.iterator) +FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds) } override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = { -def createNewSession: FetchSession.CACHE_MAP = { +var error = Errors.NONE +def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) = { val cachedPartitions = new FetchSession.CACHE_MAP(updates.size) + val sessionTopicIds = new util.HashMap[String, Uuid](updates.size) updates.forEach { (part, respData) => +if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) + error = Errors.INCONSISTENT_TOPIC_ID val reqData = fetchData.get(part) -cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData)) +val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID) +cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, respData)) +if (id != Uuid.ZERO_UUID) + sessionTopicIds.put(part.topic, id) } - cachedPartitions + (cachedPartitions, sessionTopicIds) } val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower, -updates.size, () => createNewSession) +updates.size, version, () => createNewSession) debug(s"Full fetch context with session id $responseSessionId returning " + s"${partitionsToLogString(updates.keySet)}") -FetchResponse.of(Errors.NONE, 0, responseSessionId, updates) +FetchResponse.of(error, 0, responseSessionId, updates, topicIds) Review comment: I think this goes back to the question of whether it is useful for us to have information on the specific partition that failed. If we do this, should we also return the error values for the other fields as we do in FetchRequest.getErrorResponse? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r660061358 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -193,18 +197,22 @@ class CachedPartition(val topic: String, * Each fetch session is protected by its own lock, which must be taken before mutable * fields are read or modified. This includes modification of the session partition map. * - * @param id The unique fetch session ID. - * @param privileged True if this session is privileged. Sessions crated by followers - * are privileged; sesssion created by consumers are not. - * @param partitionMap The CachedPartitionMap. - * @param creationMs The time in milliseconds when this session was created. - * @param lastUsedMs The last used time in milliseconds. This should only be updated by - * FetchSessionCache#touch. - * @param epochThe fetch session sequence number. + * @param id The unique fetch session ID. + * @param privileged True if this session is privileged. Sessions crated by followers + * are privileged; session created by consumers are not. + * @param partitionMap The CachedPartitionMap. + * @param usesTopicIds True if this session is using topic IDs + * @param sessionTopicIdsThe mapping from topic name to topic ID for topics in the session. + * @param creationMs The time in milliseconds when this session was created. + * @param lastUsedMs The last used time in milliseconds. This should only be updated by + * FetchSessionCache#touch. + * @param epoch The fetch session sequence number. */ class FetchSession(val id: Int, val privileged: Boolean, val partitionMap: FetchSession.CACHE_MAP, + val usesTopicIds: Boolean, + val sessionTopicIds: FetchSession.TOPIC_ID_MAP, Review comment: I suppose it won't hurt :) ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque * @param time The clock to use. * @param cache The fetch session cache. * @param reqMetadataThe request metadata. + * @param versionThe version of the request, * @param fetchData The partition data from the fetch request. + * @param topicIds The map from topic names to topic IDs. * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, private val cache: FetchSessionCache, private val reqMetadata: JFetchMetadata, + private val version: Short, Review comment: We can do that to make things clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r660061129 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -231,20 +239,31 @@ class FetchSession(val id: Int, def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) } def getFetchOffset(topicPartition: TopicPartition): Option[Long] = synchronized { -Option(partitionMap.find(new CachedPartition(topicPartition))).map(_.fetchOffset) +Option(partitionMap.find(new CachedPartition(topicPartition, + sessionTopicIds.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID.map(_.fetchOffset) } type TL = util.ArrayList[TopicPartition] // Update the cached partition data based on the request. def update(fetchData: FetchSession.REQ_MAP, toForget: util.List[TopicPartition], - reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized { + reqMetadata: JFetchMetadata, + topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = synchronized { val added = new TL val updated = new TL val removed = new TL +val inconsistentTopicIds = new TL fetchData.forEach { (topicPart, reqData) => - val newCachedPart = new CachedPartition(topicPart, reqData) + // Get the topic ID on the broker, if it is valid and the topic is new to the session, add its ID. + // If the topic already existed, check that its ID is consistent. + val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID) + val newCachedPart = new CachedPartition(topicPart, id, reqData) + if (id != Uuid.ZERO_UUID) { +val prevSessionTopicId = sessionTopicIds.put(topicPart.topic, id) Review comment: If a topic ID changes, the FetchSession will become a FetchErrorSession and close. I can change to putIfAbsent if it makes things clearer, but all this state will go away upon an error + session close. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`
jsancio commented on a change in pull request #10913: URL: https://github.com/apache/kafka/pull/10913#discussion_r660023057 ## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ## @@ -514,6 +514,16 @@ public UnattachedState unattachedStateOrThrow() { throw new IllegalStateException("Expected to be Leader, but current state is " + state); } +@SuppressWarnings("unchecked") +public Optional> maybeLeaderState() { +EpochState state = this.state; +if (state instanceof LeaderState) { +return Optional.of((LeaderState) state); +} else { +return Optional.empty(); +} +} Review comment: Thank you. You read my mind. I was thinking of something similar after I left my previous comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot
jsancio commented on a change in pull request #10899: URL: https://github.com/apache/kafka/pull/10899#discussion_r660032373 ## File path: clients/src/main/resources/common/message/SnapshotHeaderRecord.json ## @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "SnapshotHeaderRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{"name": "Version", "type": "int16", "versions": "0+", + "about": "The version of the snapshot header record"}, +{"name": "LastContainedLogTime", "type": "int64", "versions": "0+", + "about": "The append time of the highest record contained in this snapshot"} Review comment: How about "The append time of the last record from the log contained in the snapshot"? ## File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java ## @@ -151,6 +194,57 @@ public void testAppendToFrozenSnapshot() throws Exception { ); } +private int validateDelimiters( +RawSnapshotReader snapshot, +long lastContainedLogTime +) { +assertNotEquals(0, snapshot.sizeInBytes()); + +int countRecords = 0; + +Iterator recordBatches = Utils.covariantCast(snapshot.records().batchIterator()); + +assertEquals(Boolean.TRUE, recordBatches.hasNext()); Review comment: You can use `assertTrue` instead. This comment applies to a few places. ## File path: clients/src/main/resources/common/message/SnapshotHeaderRecord.json ## @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "SnapshotHeaderRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{"name": "Version", "type": "int16", "versions": "0+", + "about": "The version of the snapshot header record"}, +{"name": "LastContainedLogTime", "type": "int64", "versions": "0+", Review comment: Let's call this `LastContainedLogTimestamp` to match Kafka's existing use of "Timestamp"`. ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -204,35 +207,103 @@ private void completeCurrentBatch() { currentBatch = null; } -public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long currentTimeMs) { +private void appendControlMessage( +Supplier supplier, +ByteBuffer buffer +) { appendLock.lock(); try { forceDrain(); -ByteBuffer buffer = memoryPool.tryAllocate(256); -if (buffer != null) { -MemoryRecords data = MemoryRecords.withLeaderChangeMessage( -this.nextOffset, -currentTimeMs, -this.epoch, -buffer, -leaderChangeMessage -); -completed.add(new CompletedBatch<>( -nextOffset, -1, -data, -memoryPool, -buffer -)); -nextOffset += 1; -} else { -throw new IllegalStateException("Could not allocate buffer for the leader change record."); -} +completed.add(new CompletedBatch<>( +nextOffset, +1, +
[GitHub] [kafka] jacky1193610322 commented on a change in pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)
jacky1193610322 commented on a change in pull request #10931: URL: https://github.com/apache/kafka/pull/10931#discussion_r660054798 ## File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java ## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.RemoveTopicRecord; +import org.apache.kafka.common.metadata.TopicRecord; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + + +/** + * Represents changes to the topics in the metadata image. + */ +public final class TopicsDelta { +private final TopicsImage image; + +/** + * A map from topic IDs to the topic deltas for each topic. Topics which have been + * deleted will not appear in this map. + */ +private final Map changedTopics = new HashMap<>(); + +/** + * The IDs of topics that exist in the image but that have been deleted. Note that if + * a topic does not exist in the image, it will also not exist in this set. Topics + * that are created and then deleted within the same delta will leave no trace. + */ +private final Set deletedTopicIds = new HashSet<>(); Review comment: Thanks for your reply, it's right if will always keep the delta not replay before, which means we must persist the checkpoint offset every commit, but now we are not. for example, if the broker crash, we maybe replay the latest snapshot and the delta after the snapshot offset, but it may be the delta after the snapshot has been replay before. I don't sure it's right? thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
junrao commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r660016282 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -193,18 +197,22 @@ class CachedPartition(val topic: String, * Each fetch session is protected by its own lock, which must be taken before mutable * fields are read or modified. This includes modification of the session partition map. * - * @param id The unique fetch session ID. - * @param privileged True if this session is privileged. Sessions crated by followers - * are privileged; sesssion created by consumers are not. - * @param partitionMap The CachedPartitionMap. - * @param creationMs The time in milliseconds when this session was created. - * @param lastUsedMs The last used time in milliseconds. This should only be updated by - * FetchSessionCache#touch. - * @param epochThe fetch session sequence number. + * @param id The unique fetch session ID. + * @param privileged True if this session is privileged. Sessions crated by followers + * are privileged; session created by consumers are not. + * @param partitionMap The CachedPartitionMap. + * @param usesTopicIds True if this session is using topic IDs + * @param sessionTopicIdsThe mapping from topic name to topic ID for topics in the session. + * @param creationMs The time in milliseconds when this session was created. + * @param lastUsedMs The last used time in milliseconds. This should only be updated by + * FetchSessionCache#touch. + * @param epoch The fetch session sequence number. */ class FetchSession(val id: Int, val privileged: Boolean, val partitionMap: FetchSession.CACHE_MAP, + val usesTopicIds: Boolean, + val sessionTopicIds: FetchSession.TOPIC_ID_MAP, Review comment: Do we need to include the new fields in toString()? ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -231,20 +239,31 @@ class FetchSession(val id: Int, def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) } def getFetchOffset(topicPartition: TopicPartition): Option[Long] = synchronized { -Option(partitionMap.find(new CachedPartition(topicPartition))).map(_.fetchOffset) +Option(partitionMap.find(new CachedPartition(topicPartition, + sessionTopicIds.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID.map(_.fetchOffset) } type TL = util.ArrayList[TopicPartition] // Update the cached partition data based on the request. def update(fetchData: FetchSession.REQ_MAP, toForget: util.List[TopicPartition], - reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized { + reqMetadata: JFetchMetadata, + topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = synchronized { val added = new TL val updated = new TL val removed = new TL +val inconsistentTopicIds = new TL fetchData.forEach { (topicPart, reqData) => - val newCachedPart = new CachedPartition(topicPart, reqData) + // Get the topic ID on the broker, if it is valid and the topic is new to the session, add its ID. + // If the topic already existed, check that its ID is consistent. + val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID) + val newCachedPart = new CachedPartition(topicPart, id, reqData) + if (id != Uuid.ZERO_UUID) { +val prevSessionTopicId = sessionTopicIds.put(topicPart.topic, id) Review comment: It seems that we should never change the topicId in sessionTopicIds? Perhaps we should use putIfAbsent. Similarly, if the topicId changes, I am not sure if we should update partitionMap below. ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -471,16 +505,19 @@ class IncrementalFetchContext(private val time: Time, if (session.epoch != expectedEpoch) { info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but " + s"got ${session.epoch}. Possible duplicate request.") -FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP) +FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP, Collections.emptyMap()) } else { +var error = Errors.NONE Review comment: error => topLevelError? ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -471,16 +504,19 @@ class IncrementalFetchContext(private val time: Time, if (session.epoch != expectedEpoch) { info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch,
[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
ccding commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r660039117 ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -192,20 +230,10 @@ object LogLoader extends Logging { debug(s"${params.logIdentifier}Deleting stray temporary file ${file.getAbsolutePath}") Files.deleteIfExists(file.toPath) } else if (filename.endsWith(CleanedFileSuffix)) { -minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) -cleanFiles += file +minCleanedFileOffset = Math.min(offsetFromFile(file), minCleanedFileOffset) +cleanedFiles += file } else if (filename.endsWith(SwapFileSuffix)) { -// we crashed in the middle of a swap operation, to recover: -// if a log, delete the index files, complete the swap operation later -// if an index just delete the index files, they will be rebuilt -val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) -info(s"${params.logIdentifier}Found file ${file.getAbsolutePath} from interrupted swap operation.") -if (Log.isIndexFile(baseFile)) { - deleteIndicesIfExist(baseFile) -} else if (Log.isLogFile(baseFile)) { - deleteIndicesIfExist(baseFile) - swapFiles += file -} +swapFiles += file Review comment: Due to KAFKA-6264, if there are any .cleaned files (no matter they are .index.cleaned or .log.cleaned), we delete all .cleaned files and .swap files that have larger/equal base offsets. Basically, this reverts ongoing compaction/split operations. Therefore, we don't have any additional .index.cleaned files. Is that fair? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
ccding commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r660039117 ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -192,20 +230,10 @@ object LogLoader extends Logging { debug(s"${params.logIdentifier}Deleting stray temporary file ${file.getAbsolutePath}") Files.deleteIfExists(file.toPath) } else if (filename.endsWith(CleanedFileSuffix)) { -minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) -cleanFiles += file +minCleanedFileOffset = Math.min(offsetFromFile(file), minCleanedFileOffset) +cleanedFiles += file } else if (filename.endsWith(SwapFileSuffix)) { -// we crashed in the middle of a swap operation, to recover: -// if a log, delete the index files, complete the swap operation later -// if an index just delete the index files, they will be rebuilt -val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) -info(s"${params.logIdentifier}Found file ${file.getAbsolutePath} from interrupted swap operation.") -if (Log.isIndexFile(baseFile)) { - deleteIndicesIfExist(baseFile) -} else if (Log.isLogFile(baseFile)) { - deleteIndicesIfExist(baseFile) - swapFiles += file -} +swapFiles += file Review comment: Due to KAFKA-6264, if there are any .cleaned files (no matter they are .index.cleaned or .log.cleaned), we delete all .cleaned files and .swap files that have larger/equal base offsets. Basically, this reverts ongoing compaction/split operations. Therefore, we don't have any additional .index.cleaned files. Is that fair? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
ccding commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r660039117 ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -192,20 +230,10 @@ object LogLoader extends Logging { debug(s"${params.logIdentifier}Deleting stray temporary file ${file.getAbsolutePath}") Files.deleteIfExists(file.toPath) } else if (filename.endsWith(CleanedFileSuffix)) { -minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) -cleanFiles += file +minCleanedFileOffset = Math.min(offsetFromFile(file), minCleanedFileOffset) +cleanedFiles += file } else if (filename.endsWith(SwapFileSuffix)) { -// we crashed in the middle of a swap operation, to recover: -// if a log, delete the index files, complete the swap operation later -// if an index just delete the index files, they will be rebuilt -val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) -info(s"${params.logIdentifier}Found file ${file.getAbsolutePath} from interrupted swap operation.") -if (Log.isIndexFile(baseFile)) { - deleteIndicesIfExist(baseFile) -} else if (Log.isLogFile(baseFile)) { - deleteIndicesIfExist(baseFile) - swapFiles += file -} +swapFiles += file Review comment: Due to KAFKA-6264, if there are any .cleaned files (no matter they are .index.cleaned or .log.cleaned), we delete all .cleaned files and .swap files that have larger/equal base offsets. Basically, this reverts ongoing compaction/split operations. Therefore, at this point, we don't have any .cleaned files. Is that fair? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
ccding commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r660039117 ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -192,20 +230,10 @@ object LogLoader extends Logging { debug(s"${params.logIdentifier}Deleting stray temporary file ${file.getAbsolutePath}") Files.deleteIfExists(file.toPath) } else if (filename.endsWith(CleanedFileSuffix)) { -minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) -cleanFiles += file +minCleanedFileOffset = Math.min(offsetFromFile(file), minCleanedFileOffset) +cleanedFiles += file } else if (filename.endsWith(SwapFileSuffix)) { -// we crashed in the middle of a swap operation, to recover: -// if a log, delete the index files, complete the swap operation later -// if an index just delete the index files, they will be rebuilt -val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) -info(s"${params.logIdentifier}Found file ${file.getAbsolutePath} from interrupted swap operation.") -if (Log.isIndexFile(baseFile)) { - deleteIndicesIfExist(baseFile) -} else if (Log.isLogFile(baseFile)) { - deleteIndicesIfExist(baseFile) - swapFiles += file -} +swapFiles += file Review comment: Due to KAFKA-6264, if there are any .cleaned files (no matter they are .index.cleaned or .log.cleaned), we delete all .cleaned files and .swap files that have larger base offset. Basically, this reverts ongoing compaction/split operations. Therefore, at this point, we don't have any .cleaned files. Is that fair? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`
mumrah commented on a change in pull request #10913: URL: https://github.com/apache/kafka/pull/10913#discussion_r660029217 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2250,7 +2249,40 @@ private Long append(int epoch, List records, boolean isAtomic) { @Override public void resign(int epoch) { -throw new UnsupportedOperationException(); +if (epoch < 0) { +throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch); +} + +if (!quorum.isVoter()) { +throw new IllegalArgumentException("Attempt to resign by a non-voter"); +} + +LeaderAndEpoch leaderAndEpoch = leaderAndEpoch(); +int currentEpoch = leaderAndEpoch.epoch(); + +if (epoch > currentEpoch) { +throw new IllegalArgumentException("Attempt to resign from epoch " + epoch + +" which is larger than the current epoch " + currentEpoch); +} else if (epoch < currentEpoch) { +// If the passed epoch is smaller than the current epoch, then it might mean +// that the listener has not been notified about a leader change that already +// took place. In this case, we consider the call as already fulfilled and +// take no further action. +return; +} else if (!leaderAndEpoch.isLeader(quorum.localIdOrThrow())) { Review comment: I see what you mean, and yea that is a fair point -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #10930: KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs
rajinisivaram commented on pull request #10930: URL: https://github.com/apache/kafka/pull/10930#issuecomment-869929738 @guozhangwang With Errors.NONE, we throw OffsetOutOfRangeException in the follower when attempting to update follower's start offset based on the leader's start offset returned in the response: https://github.com/apache/kafka/blob/397fa1f894c176d71601183c36e5d498fc83fd1e/core/src/main/scala/kafka/log/Log.scala#L997. Since that is a safeguard that existed prior to the new code in the leader to process diverging epochs for IBP 2.7 and higher, it seems safer to retain it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich edited a comment on pull request #10652: KAFKA-9726 IdentityReplicationPolicy
mdedetrich edited a comment on pull request #10652: URL: https://github.com/apache/kafka/pull/10652#issuecomment-869693584 Is there anything left on this PR to be merged (apart from the changelog which is a nice to have)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`
mumrah commented on a change in pull request #10913: URL: https://github.com/apache/kafka/pull/10913#discussion_r660025194 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1914,8 +1915,7 @@ private long pollLeader(long currentTimeMs) { LeaderState state = quorum.leaderStateOrThrow(); maybeFireLeaderChange(state); -GracefulShutdown shutdown = this.shutdown.get(); -if (shutdown != null) { +if (shutdown.get() != null || state.isResignRequested()) { Review comment: Works for me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`
hachikuji commented on a change in pull request #10913: URL: https://github.com/apache/kafka/pull/10913#discussion_r660024276 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1914,8 +1915,7 @@ private long pollLeader(long currentTimeMs) { LeaderState state = quorum.leaderStateOrThrow(); maybeFireLeaderChange(state); -GracefulShutdown shutdown = this.shutdown.get(); -if (shutdown != null) { +if (shutdown.get() != null || state.isResignRequested()) { Review comment: If you don't mind, let's do this refactor separately. There are a fair number of uses that would benefit from having `Optional`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`
jsancio commented on a change in pull request #10913: URL: https://github.com/apache/kafka/pull/10913#discussion_r660023057 ## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ## @@ -514,6 +514,16 @@ public UnattachedState unattachedStateOrThrow() { throw new IllegalStateException("Expected to be Leader, but current state is " + state); } +@SuppressWarnings("unchecked") +public Optional> maybeLeaderState() { +EpochState state = this.state; +if (state instanceof LeaderState) { +return Optional.of((LeaderState) state); +} else { +return Optional.empty(); +} +} Review comment: Thanks you. You read my mind. I was thinking of something similar after I left my previous comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`
hachikuji commented on a change in pull request #10913: URL: https://github.com/apache/kafka/pull/10913#discussion_r660021011 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2250,7 +2249,40 @@ private Long append(int epoch, List records, boolean isAtomic) { @Override public void resign(int epoch) { -throw new UnsupportedOperationException(); +if (epoch < 0) { +throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch); +} + +if (!quorum.isVoter()) { +throw new IllegalArgumentException("Attempt to resign by a non-voter"); +} + +LeaderAndEpoch leaderAndEpoch = leaderAndEpoch(); +int currentEpoch = leaderAndEpoch.epoch(); + +if (epoch > currentEpoch) { +throw new IllegalArgumentException("Attempt to resign from epoch " + epoch + +" which is larger than the current epoch " + currentEpoch); +} else if (epoch < currentEpoch) { +// If the passed epoch is smaller than the current epoch, then it might mean +// that the listener has not been notified about a leader change that already +// took place. In this case, we consider the call as already fulfilled and +// take no further action. +return; +} else if (!leaderAndEpoch.isLeader(quorum.localIdOrThrow())) { Review comment: If the epoch has moved on, then the leader check is likely to fail, so the current order seems to make sense. We don't keep a history of previous states, so I think the best we can do is catch cases where the passed epoch does not make sense with the current state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`
hachikuji commented on a change in pull request #10913: URL: https://github.com/apache/kafka/pull/10913#discussion_r660007360 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1914,8 +1915,7 @@ private long pollLeader(long currentTimeMs) { LeaderState state = quorum.leaderStateOrThrow(); maybeFireLeaderChange(state); -GracefulShutdown shutdown = this.shutdown.get(); -if (shutdown != null) { +if (shutdown.get() != null || state.isResignRequested()) { Review comment: That's fair. Let me see if it's reasonable to do this here or if we should push to a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`
mumrah commented on a change in pull request #10913: URL: https://github.com/apache/kafka/pull/10913#discussion_r659989672 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2250,7 +2250,42 @@ private Long append(int epoch, List records, boolean isAtomic) { @Override public void resign(int epoch) { -throw new UnsupportedOperationException(); +if (epoch < 0) { +throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch); +} + +if (!quorum.isVoter()) { +throw new IllegalArgumentException("Attempt to resign by a non-voter"); Review comment: Would IllegalStateException be better here? ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1914,8 +1915,7 @@ private long pollLeader(long currentTimeMs) { LeaderState state = quorum.leaderStateOrThrow(); maybeFireLeaderChange(state); -GracefulShutdown shutdown = this.shutdown.get(); -if (shutdown != null) { +if (shutdown.get() != null || state.isResignRequested()) { Review comment: unrelated, but maybe worth creating helper method that returns `Optional` to avoid these null checks throughout ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2250,7 +2249,40 @@ private Long append(int epoch, List records, boolean isAtomic) { @Override public void resign(int epoch) { -throw new UnsupportedOperationException(); +if (epoch < 0) { +throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch); +} + +if (!quorum.isVoter()) { +throw new IllegalArgumentException("Attempt to resign by a non-voter"); +} + +LeaderAndEpoch leaderAndEpoch = leaderAndEpoch(); +int currentEpoch = leaderAndEpoch.epoch(); + +if (epoch > currentEpoch) { +throw new IllegalArgumentException("Attempt to resign from epoch " + epoch + +" which is larger than the current epoch " + currentEpoch); +} else if (epoch < currentEpoch) { +// If the passed epoch is smaller than the current epoch, then it might mean +// that the listener has not been notified about a leader change that already +// took place. In this case, we consider the call as already fulfilled and +// take no further action. +return; +} else if (!leaderAndEpoch.isLeader(quorum.localIdOrThrow())) { Review comment: Does it make sense to do this check before the epoch validation? If we're not the leader and received an old epoch (which, if i understand, seems likely if we're _not_ the leader anymore), we will silently ignore in the above case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy
mimaison commented on pull request #10652: URL: https://github.com/apache/kafka/pull/10652#issuecomment-869893715 Let's put a line in the changelog now so we're sure it's included in the release notes. I'm happy to merge once this is done. I agree the documentation can come later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r659995458 ## File path: clients/src/main/java/org/apache/kafka/common/TopicCollection.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import java.util.Collection; + +/** + * A class used to represent a collection of topics. This collection may define topics by topic name + * attribute or topic ID attribute. Subclassing this class beyond the classes provided here is not supported. + */ +public abstract class TopicCollection { +private final TopicAttribute attribute; + +/** + * An enum used to describe how topics in the collection are identified + */ +public enum TopicAttribute { Review comment: I kept this TopicAttribute field for now, though we could case on the class (TopicIdCollection/TopicNameCollection) One reason I didn't remove it is that the result class(es) still needs it. (Since we are building off the existing class). If there is a way to remove from the result class, or a better place to put the attribute, let me know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomaskwscott opened a new pull request #10936: KAFKA-13002 Fix for non max timestamp degrade case
thomaskwscott opened a new pull request #10936: URL: https://github.com/apache/kafka/pull/10936 KAFKA-12541 introduced a regression for listOffsets requests for non maxtimestamp specs. when communicating with old brokers. This PR addresss this case. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* Tested with new unit test for regression case. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10774) Support Describe topic using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17370749#comment-17370749 ] Justine Olshan commented on KAFKA-10774: Removed the reference to describe topics from this ticket: https://issues.apache.org/jira/browse/KAFKA-12976 But we will want to only return UNKNOWN_TOPIC_ID and not UNSUPPORTED_VERSION > Support Describe topic using topic IDs > -- > > Key: KAFKA-10774 > URL: https://issues.apache.org/jira/browse/KAFKA-10774 > Project: Kafka > Issue Type: Sub-task >Reporter: dengziming >Assignee: dengziming >Priority: Major > > Similar to KAFKA-10547 which add topic IDs in MetadataResp, we add topic IDs > to MetadataReq and can get TopicDesc by topic IDs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12976) Remove UNSUPPORTED_VERSION error from delete topics call
[ https://issues.apache.org/jira/browse/KAFKA-12976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12976: Fix Version/s: 2.8.1 3.0.0 > Remove UNSUPPORTED_VERSION error from delete topics call > > > Key: KAFKA-12976 > URL: https://issues.apache.org/jira/browse/KAFKA-12976 > Project: Kafka > Issue Type: Task >Affects Versions: 3.0.0, 2.8.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > Fix For: 3.0.0, 2.8.1 > > > Originally I thought it would be useful to have an unsupported version error > returned when the broker's IBP did not support the operation. However, this > error is transient and in the -case of describe topics, it may not be > accurate.- Additionally, unsupported version is not retriable when the > scenarios that see this likely should be. > I propose always returning UNKNOWN_TOPIC_ID error when the topic ID is not > found on the broker. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12976) Remove UNSUPPORTED_VERSION error from delete topics call
[ https://issues.apache.org/jira/browse/KAFKA-12976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12976. - Resolution: Fixed > Remove UNSUPPORTED_VERSION error from delete topics call > > > Key: KAFKA-12976 > URL: https://issues.apache.org/jira/browse/KAFKA-12976 > Project: Kafka > Issue Type: Task >Affects Versions: 3.0.0, 2.8.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > Fix For: 3.0.0, 2.8.1 > > > Originally I thought it would be useful to have an unsupported version error > returned when the broker's IBP did not support the operation. However, this > error is transient and in the -case of describe topics, it may not be > accurate.- Additionally, unsupported version is not retriable when the > scenarios that see this likely should be. > I propose always returning UNKNOWN_TOPIC_ID error when the topic ID is not > found on the broker. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12976) Remove UNSUPPORTED_VERSION error from delete topics call
[ https://issues.apache.org/jira/browse/KAFKA-12976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-12976: --- Affects Version/s: 3.0.0 2.8.0 > Remove UNSUPPORTED_VERSION error from delete topics call > > > Key: KAFKA-12976 > URL: https://issues.apache.org/jira/browse/KAFKA-12976 > Project: Kafka > Issue Type: Task >Affects Versions: 3.0.0, 2.8.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > Originally I thought it would be useful to have an unsupported version error > returned when the broker's IBP did not support the operation. However, this > error is transient and in the -case of describe topics, it may not be > accurate.- Additionally, unsupported version is not retriable when the > scenarios that see this likely should be. > I propose always returning UNKNOWN_TOPIC_ID error when the topic ID is not > found on the broker. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12976) Remove UNSUPPORTED_VERSION error from delete topics call
[ https://issues.apache.org/jira/browse/KAFKA-12976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-12976: --- Description: Originally I thought it would be useful to have an unsupported version error returned when the broker's IBP did not support the operation. However, this error is transient and in the -case of describe topics, it may not be accurate.- Additionally, unsupported version is not retriable when the scenarios that see this likely should be. I propose always returning UNKNOWN_TOPIC_ID error when the topic ID is not found on the broker. was: Originally I thought it would be useful to have an unsupported version error returned when the broker's IBP did not support the operation. However, this error is transient and in the case of describe topics, it may not be accurate. Additionally, unsupported version is not retriable when the scenarios that see this likely should be. I propose always returning UNKNOWN_TOPIC_ID error when the topic ID is not found on the broker. Summary: Remove UNSUPPORTED_VERSION error from delete topics call (was: Remove UNSUPPORTED_VERSION error from delete and describe topics calls) > Remove UNSUPPORTED_VERSION error from delete topics call > > > Key: KAFKA-12976 > URL: https://issues.apache.org/jira/browse/KAFKA-12976 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > Originally I thought it would be useful to have an unsupported version error > returned when the broker's IBP did not support the operation. However, this > error is transient and in the -case of describe topics, it may not be > accurate.- Additionally, unsupported version is not retriable when the > scenarios that see this likely should be. > I propose always returning UNKNOWN_TOPIC_ID error when the topic ID is not > found on the broker. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #10923: KAFKA-12976: Remove UNSUPPORTED_VERSION error from delete topics call
hachikuji merged pull request #10923: URL: https://github.com/apache/kafka/pull/10923 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
junrao commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r659963924 ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -90,11 +90,58 @@ object LogLoader extends Logging { * overflow index offset */ def load(params: LoadLogParams): LoadedLogOffsets = { -// first do a pass through the files in the log directory and remove any temporary files + +// First pass: through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles(params) -// Now do a second pass and load all the log and index files. +// The remaining valid swap files must come from compaction or segment split operation. We can +// simply rename them to regular segment files. But, before renaming, we should figure out which +// segments are compacted and delete these segment files: this is done by calculating min/maxSwapFileOffset. Review comment: "which segments are compacted": .swap files are also generated from splitting. ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -192,20 +230,10 @@ object LogLoader extends Logging { debug(s"${params.logIdentifier}Deleting stray temporary file ${file.getAbsolutePath}") Files.deleteIfExists(file.toPath) } else if (filename.endsWith(CleanedFileSuffix)) { -minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) -cleanFiles += file +minCleanedFileOffset = Math.min(offsetFromFile(file), minCleanedFileOffset) +cleanedFiles += file } else if (filename.endsWith(SwapFileSuffix)) { -// we crashed in the middle of a swap operation, to recover: -// if a log, delete the index files, complete the swap operation later -// if an index just delete the index files, they will be rebuilt -val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) -info(s"${params.logIdentifier}Found file ${file.getAbsolutePath} from interrupted swap operation.") -if (Log.isIndexFile(baseFile)) { - deleteIndicesIfExist(baseFile) -} else if (Log.isLogFile(baseFile)) { - deleteIndicesIfExist(baseFile) - swapFiles += file -} +swapFiles += file Review comment: It's possible that during renaming, we have only renamed the .log file to .swap, but not the corresponding index files. Should we find those .clean files with the same offset and rename them to .swap? ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -90,11 +90,58 @@ object LogLoader extends Logging { * overflow index offset */ def load(params: LoadLogParams): LoadedLogOffsets = { -// first do a pass through the files in the log directory and remove any temporary files + +// First pass: through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles(params) -// Now do a second pass and load all the log and index files. +// The remaining valid swap files must come from compaction or segment split operation. We can +// simply rename them to regular segment files. But, before renaming, we should figure out which +// segments are compacted and delete these segment files: this is done by calculating min/maxSwapFileOffset. +// We store segments that require renaming in this code block, and do the actual renaming later. +var minSwapFileOffset = Long.MaxValue +var maxSwapFileOffset = Long.MinValue +swapFiles.filter(f => Log.isLogFile(new File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "".foreach { f => + val baseOffset = offsetFromFile(f) + val segment = LogSegment.open(f.getParentFile, +baseOffset = baseOffset, +params.config, +time = params.time, +fileSuffix = Log.SwapFileSuffix) + info(s"${params.logIdentifier}Found log file ${f.getPath} from interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix} files by renaming.") + minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset) + maxSwapFileOffset = Math.max(segment.offsetIndex.lastOffset, maxSwapFileOffset) +} + +// Second pass: delete segments that are between minSwapFileOffset and maxSwapFileOffset. As +// discussed above, these segments were compacted or split but haven't been renamed to .delete +// before shutting down the broker. +for (file <- params.dir.listFiles if file.isFile) { + try { +if (!file.getName.endsWith(SwapFileSuffix)) { + val offset = offsetFromFile(file) + if (offset >=
[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.
[ https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17370724#comment-17370724 ] Jun Rao commented on KAFKA-2729: [~l0co], thanks for reporting this. The "Cached zkVersion [212]" error indicates the leader epoch was changed by the controller but somehow wasn't propagated to the broker. Could you grep for "Partition __consumer_offsets-30" in the controller and state-change log and see which controller changed the leader epoch corresponding to zk version 212 and whether the controller tried to propagate that info to the brokers? > Cached zkVersion not equal to that in zookeeper, broker not recovering. > --- > > Key: KAFKA-2729 > URL: https://issues.apache.org/jira/browse/KAFKA-2729 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1, 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.11.0.0, 2.4.1 >Reporter: Danil Serdyuchenko >Assignee: Onur Karaman >Priority: Critical > Fix For: 1.1.0 > > > After a small network wobble where zookeeper nodes couldn't reach each other, > we started seeing a large number of undereplicated partitions. The zookeeper > cluster recovered, however we continued to see a large number of > undereplicated partitions. Two brokers in the kafka cluster were showing this > in the logs: > {code} > [2015-10-27 11:36:00,888] INFO Partition > [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for > partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 > (kafka.cluster.Partition) > [2015-10-27 11:36:00,891] INFO Partition > [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] > not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) > {code} > For all of the topics on the effected brokers. Both brokers only recovered > after a restart. Our own investigation yielded nothing, I was hoping you > could shed some light on this issue. Possibly if it's related to: > https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using > 0.8.2.1. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)
cmccabe commented on a change in pull request #10931: URL: https://github.com/apache/kafka/pull/10931#discussion_r659940260 ## File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java ## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.RemoveTopicRecord; +import org.apache.kafka.common.metadata.TopicRecord; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + + +/** + * Represents changes to the topics in the metadata image. + */ +public final class TopicsDelta { +private final TopicsImage image; + +/** + * A map from topic IDs to the topic deltas for each topic. Topics which have been + * deleted will not appear in this map. + */ +private final Map changedTopics = new HashMap<>(); + +/** + * The IDs of topics that exist in the image but that have been deleted. Note that if + * a topic does not exist in the image, it will also not exist in this set. Topics + * that are created and then deleted within the same delta will leave no trace. + */ +private final Set deletedTopicIds = new HashSet<>(); Review comment: If the topic is created and then deleted within the same delta, the topic log dir is never created, so there is no need for it to be deleted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ryannedolan commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy
ryannedolan commented on pull request #10652: URL: https://github.com/apache/kafka/pull/10652#issuecomment-869824087 I think changelog and documentation updates can come after this is merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ryannedolan commented on pull request #10629: BlockingConnectorTest improvements to verify Connectors and Tasks are successfully deleted
ryannedolan commented on pull request #10629: URL: https://github.com/apache/kafka/pull/10629#issuecomment-869823002 Good point @C0urante I hadn't considered how expensive these tests can be. I'll see if I can collapse them into fewer tests. thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #10629: BlockingConnectorTest improvements to verify Connectors and Tasks are successfully deleted
C0urante commented on a change in pull request #10629: URL: https://github.com/apache/kafka/pull/10629#discussion_r659903390 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java ## @@ -392,6 +392,48 @@ protected boolean checkConnectorAndTasksAreStopped(String connectorName) { && info.tasks().stream().noneMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString())); } +/** + * Assert that a connector and its tasks are deleted. + * + * @param connectorName the connector name + * @param detailMessage the assertion message + * @throws InterruptedException + */ +public void assertConnectorAndTasksAreDeleted(String connectorName, String detailMessage) Review comment: Nit: do we really need the `AndTasks` section? Might be fine to use `assertConnectorIsDeleted` instead. ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java ## @@ -392,6 +392,48 @@ protected boolean checkConnectorAndTasksAreStopped(String connectorName) { && info.tasks().stream().noneMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString())); } +/** + * Assert that a connector and its tasks are deleted. + * + * @param connectorName the connector name + * @param detailMessage the assertion message + * @throws InterruptedException + */ +public void assertConnectorAndTasksAreDeleted(String connectorName, String detailMessage) +throws InterruptedException { +try { +waitForCondition( +() -> checkConnectorAndTasksAreDeleted(connectorName), +CONNECTOR_SETUP_DURATION_MS, +"At least the connector or one of its tasks still exists."); +} catch (AssertionError e) { +throw new AssertionError(detailMessage, e); +} +} + +/** + * Check whether the connector or any of its tasks still exist. + * + * @param connectorName the connector + * @return true if the connector and all the tasks are not in RUNNING state; false otherwise + */ +protected boolean checkConnectorAndTasksAreDeleted(String connectorName) { +ConnectorStateInfo info; +try { +info = connect.connectorStatus(connectorName); +} catch (ConnectRestException e) { +return e.statusCode() == Response.Status.NOT_FOUND.getStatusCode(); +} catch (Exception e) { +log.error("Could not check connector state info.", e); +return false; +} +if (info == null) { +return true; +} Review comment: Do we know when this might happen? My understanding is that part of the contract we're trying to test here is that Connect gives back a 404 response when a stalled connector is deleted; do we want to relax our expectations to also permit this case? ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java ## @@ -596,6 +662,8 @@ public void start(Map props) { @Override public List poll() { block.maybeBlockOn(SOURCE_TASK_POLL); +// even when not blocking, pause to prevent a tight loop +Utils.sleep(1000); Review comment: Good call -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
junrao commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r659914342 ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -90,11 +90,63 @@ object LogLoader extends Logging { * overflow index offset */ def load(params: LoadLogParams): LoadedLogOffsets = { -// first do a pass through the files in the log directory and remove any temporary files + +// First pass: through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles(params) -// Now do a second pass and load all the log and index files. +// The remaining valid swap files must come from compaction or segment split operation. We can +// simply rename them to regular segment files. But, before renaming, we should figure out which +// segments are compacted and delete these segment files: this is done by calculating min/maxSwapFileOffset. +// We store segments that require renaming in this code block, and do the actual renaming later. +var minSwapFileOffset = Long.MaxValue +var maxSwapFileOffset = Long.MinValue +val toRenameSwapFiles = mutable.Set[File]() +swapFiles.filter(f => Log.isLogFile(new File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "".foreach { f => + val baseOffset = offsetFromFile(f) + val segment = LogSegment.open(f.getParentFile, +baseOffset = baseOffset, +params.config, +time = params.time, +fileSuffix = Log.SwapFileSuffix) + toRenameSwapFiles += f + info(s"${params.logIdentifier}Found log file ${f.getPath} from interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix} files by renaming.") + minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset) + maxSwapFileOffset = Math.max(segment.offsetIndex.lastOffset, maxSwapFileOffset) Review comment: > I could be wrong, but I think if it is compaction, the last record will never be removed. The reason is that compaction always removes earlier records of each key, and the last record will never be an earlier one. > > Split should be similar. It's true that we generally don't remove the last record during compaction. However, during a round of cleaning, we clean segments in groups and each group generates a single .clean file. The group is formed to make sure that offsets are still within 2 billion in offset gap and the .clean file won't exceed 2GB in size. If multiple groups are formed, it's possible that a group that's not the last doesn't preserve the last record. > How can we get the next segment before finishing the recovery process? We could potentially scan all .log files and sort them in offset order. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12958) Add simulation invariant for leadership and snapshot
[ https://issues.apache.org/jira/browse/KAFKA-12958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17370679#comment-17370679 ] Jose Armando Garcia Sancio commented on KAFKA-12958: Thanks [~zhaohaidao] . I'll try to take a look at your PR today. Not sure if this is what is happening but the `KafkaRaftClient` may have multiple registered `RaftClient.Listener`. If some of them have not been notify of their leadership through `handleLeaderChange` it is okay for them to see calls to `handleSnapshot`. > Add simulation invariant for leadership and snapshot > > > Key: KAFKA-12958 > URL: https://issues.apache.org/jira/browse/KAFKA-12958 > Project: Kafka > Issue Type: Sub-task >Reporter: Jose Armando Garcia Sancio >Assignee: HaiyuanZhao >Priority: Major > Attachments: image-2021-06-27-02-09-25-296.png, > image-2021-06-27-02-15-23-760.png, image-2021-06-27-02-26-48-368.png, > image-2021-06-27-02-27-41-966.png > > > During the simulation we should add an invariant that notified leaders are > never asked to load snapshots. The state machine always sees the following > sequence of callback calls: > Leaders see: > ... > handleLeaderChange state machine is notify of leadership > handleSnapshot is never called > Non-leader see: > ... > handleLeaderChange state machine is notify that is not leader > handleSnapshot is called 0 or more times -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix
guozhangwang commented on a change in pull request #10917: URL: https://github.com/apache/kafka/pull/10917#discussion_r659908813 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -186,12 +190,17 @@ public void process(final K key, final V1 value) { @SuppressWarnings("unchecked") private void emitNonJoinedOuterRecords(final WindowStore, LeftOrRightValue> store) { +if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) { +return; +} + try (final KeyValueIterator>, LeftOrRightValue> it = store.all()) { while (it.hasNext()) { final KeyValue>, LeftOrRightValue> record = it.next(); final Windowed> windowedKey = record.key; final LeftOrRightValue value = record.value; +minTime.minTime = windowedKey.window().start(); Review comment: Instead of trying to update minTime on each record, could we just set it once in line 207 below, plus setting it to MAX if we've exhausted all records (as @spena indicated above)? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ## @@ -69,6 +69,10 @@ public long get() { } } +static class MinTime { Review comment: Could we merge this and MaxObservedStreamTime into a single class to be shared among operator nodes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10934: [DO NOT MERGE] Scala3 test
jlprat commented on pull request #10934: URL: https://github.com/apache/kafka/pull/10934#issuecomment-869779558 > Thanks for the PR. If I understand correctly, this assumes we would drop Scala 2.12 compatibility. That makes sense, but it does mean waiting quite a while (12+ months). Shall I try to work on supporting the 3 Scala versions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10760: KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734)
ijuma commented on pull request #10760: URL: https://github.com/apache/kafka/pull/10760#issuecomment-869776967 Thanks for the quick investigation! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #10760: KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734)
dajac commented on pull request #10760: URL: https://github.com/apache/kafka/pull/10760#issuecomment-869772020 We have found the issue. @thomaskwscott is working on the fix. My bad, I missed one case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13002) dev branch Streams not able to fetch end offsets from pre-3.0 brokers
[ https://issues.apache.org/jira/browse/KAFKA-13002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17370653#comment-17370653 ] Tom Scott commented on KAFKA-13002: --- I think I've determined the problem, I'll get a PR together ASAP > dev branch Streams not able to fetch end offsets from pre-3.0 brokers > - > > Key: KAFKA-13002 > URL: https://issues.apache.org/jira/browse/KAFKA-13002 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: John Roesler >Assignee: Tom Scott >Priority: Blocker > Fix For: 3.0.0 > > Attachments: soaks.png > > > Note: this is not a report against a released version of AK. It seems to be a > problem on the trunk development branch only. > After deploying our soak test against `trunk/HEAD` on Friday, I noticed that > Streams is no longer processing: > !soaks.png! > I found this stacktrace in the logs during startup: > {code:java} > 5075 [2021-06-25T16:50:44-05:00] > (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) > [2021-06-25 21:50:44,499] WARN [i-0691913411e8c77c3-StreamThread-1] The > listOffsets request failed. > (org.apache.kafka.streams.processor.internals.ClientUtils) > 5076 [2021-06-25T16:50:44-05:00] > (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnsupportedVersionException: The broker does > not support LIST_OFFSETS with version in range [7,7]. The supported > range is [0,6]. > 5077 at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > 5078 at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > 5079 at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > 5080 at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > 5081 at > org.apache.kafka.streams.processor.internals.ClientUtils.getEndOffsets(ClientUtils.java:147) > 5082 at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.populateClientStatesMap(StreamsPartitionAssignor.java:643) > 5083 at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToClients(StreamsPartitionAssignor.java:579) > 5084 at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:387) > 5085 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589) > 5086 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689) > 5087 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111) > 5088 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:593) > 5089 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:556) > 5090 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1178) > 5091 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1153) > 5092 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) > 5093 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) > 5094 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) > 5095 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) > 5096 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) > 5097 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > 5098 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > 5099 at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297) > 5100 at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) > 5101 at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) > 5102 at >
[jira] [Assigned] (KAFKA-13002) dev branch Streams not able to fetch end offsets from pre-3.0 brokers
[ https://issues.apache.org/jira/browse/KAFKA-13002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-13002: --- Assignee: Tom Scott (was: David Jacot) > dev branch Streams not able to fetch end offsets from pre-3.0 brokers > - > > Key: KAFKA-13002 > URL: https://issues.apache.org/jira/browse/KAFKA-13002 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: John Roesler >Assignee: Tom Scott >Priority: Blocker > Fix For: 3.0.0 > > Attachments: soaks.png > > > Note: this is not a report against a released version of AK. It seems to be a > problem on the trunk development branch only. > After deploying our soak test against `trunk/HEAD` on Friday, I noticed that > Streams is no longer processing: > !soaks.png! > I found this stacktrace in the logs during startup: > {code:java} > 5075 [2021-06-25T16:50:44-05:00] > (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) > [2021-06-25 21:50:44,499] WARN [i-0691913411e8c77c3-StreamThread-1] The > listOffsets request failed. > (org.apache.kafka.streams.processor.internals.ClientUtils) > 5076 [2021-06-25T16:50:44-05:00] > (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnsupportedVersionException: The broker does > not support LIST_OFFSETS with version in range [7,7]. The supported > range is [0,6]. > 5077 at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > 5078 at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > 5079 at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > 5080 at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > 5081 at > org.apache.kafka.streams.processor.internals.ClientUtils.getEndOffsets(ClientUtils.java:147) > 5082 at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.populateClientStatesMap(StreamsPartitionAssignor.java:643) > 5083 at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToClients(StreamsPartitionAssignor.java:579) > 5084 at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:387) > 5085 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589) > 5086 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689) > 5087 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111) > 5088 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:593) > 5089 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:556) > 5090 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1178) > 5091 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1153) > 5092 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) > 5093 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) > 5094 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) > 5095 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) > 5096 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) > 5097 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > 5098 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > 5099 at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297) > 5100 at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) > 5101 at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) > 5102 at >
[GitHub] [kafka] guozhangwang commented on pull request #10930: KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs
guozhangwang commented on pull request #10930: URL: https://github.com/apache/kafka/pull/10930#issuecomment-869764069 @rajinisivaram I think my confusion comes from `ReplicaFetcherThread throws OffsetOutOfRangeException when processing responses with Errors.NONE if the leader's offsets in the response are out of range and this moves the partition to failed state. ` Could you point me to the code where this is currently happening? Also I'm wondering since we are fixing the logic on the leader now, if this logic did exist do we still need it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org