[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r660250731



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##
@@ -30,24 +31,62 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-final Map> futures;
+private Map> nameFutures;
+private Map> topicIdFutures;
 
-protected DeleteTopicsResult(Map> futures) {
-this.futures = futures;
+protected DeleteTopicsResult() {}

Review comment:
   Ah I remember the issue with that being private. There is a test that 
extends this class.  
   
   > @hachikuji I originally wanted the constructor for DeleteTopicsResult to 
be private, but InternalTopicManagerTest required creating a subclass.
   
   https://github.com/apache/kafka/pull/10892#issuecomment-862876282
   
   The issue with just using the static method is that it is protected in the 
class. So the options are to either make the class protected (as is now), make 
the static method public, or change the test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13005) Support JBOD in kraft mode

2021-06-28 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17371005#comment-17371005
 ] 

dengziming commented on KAFKA-13005:


This is currently blocked by KAFKA-9837, I will do this ;)

> Support JBOD in kraft mode
> --
>
> Key: KAFKA-13005
> URL: https://issues.apache.org/jira/browse/KAFKA-13005
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: dengziming
>Priority: Major
>  Labels: kip-500
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13005) Support JBOD in kraft mode

2021-06-28 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming reassigned KAFKA-13005:
--

Assignee: dengziming

> Support JBOD in kraft mode
> --
>
> Key: KAFKA-13005
> URL: https://issues.apache.org/jira/browse/KAFKA-13005
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: dengziming
>Priority: Major
>  Labels: kip-500
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-06-28 Thread GitBox


guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-870174294


   You were referring to this commit 
https://github.com/apache/kafka/pull/10798/commits/68a947c0eb6a5cc4bdde24083c83f4638e708edb
 as for the tweaks right?
   
   BTW it's bit interesting to see the improvement for range is around 1.442 / 
1.131 = 1.27 while for putAll it is 1.065 / 0.919 = 1.15. Given the key length 
to be similar in the benchmarks I was expecting latter has a bigger benefit. 
@cadonna WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot

2021-06-28 Thread GitBox


jsancio commented on a change in pull request #10899:
URL: https://github.com/apache/kafka/pull/10899#discussion_r660227382



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -100,4 +116,21 @@ public void close() {
 new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, 
maxBatchSize)
 );
 }
+
+/**
+ * Returns the next non-control Batch
+ */
+private Optional> nextBatch() {
+while (iterator.hasNext()) {
+Batch batch = iterator.next();
+
+if (batch.records().isEmpty()) {
+continue;
+} else {
+return Optional.of(batch);
+}

Review comment:
   ```suggestion
   if (!batch.records().isEmpty()) {
   return Optional.of(batch);
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660228674



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -471,16 +512,26 @@ class IncrementalFetchContext(private val time: Time,
   if (session.epoch != expectedEpoch) {
 info(s"Incremental fetch session ${session.id} expected epoch 
$expectedEpoch, but " +
   s"got ${session.epoch}.  Possible duplicate request.")
-FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP)
+FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP, Collections.emptyMap())
   } else {
+var topLevelError = Errors.NONE
 // Iterate over the update list using PartitionIterator. This will 
prune updates which don't need to be sent
+// It will also set the top-level error to INCONSISTENT_TOPIC_ID if 
any partitions had this error.
 val partitionIter = new PartitionIterator(updates.entrySet.iterator, 
true)
 while (partitionIter.hasNext) {
-  partitionIter.next()
+  val entry = partitionIter.next()
+  if (entry.getValue.errorCode() == 
Errors.INCONSISTENT_TOPIC_ID.code()) {

Review comment:
   The topic ID should not change in the log once it is set. I think what 
you said in the last sentence is correct. My understanding is that if the log 
is closed, it can not read from it anymore. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660228129



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -296,11 +276,24 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
 // may not be any partitions at all in the response.  For this reason, 
the top-level error code
 // is essential for them.
 Errors error = Errors.forException(e);
-LinkedHashMap 
responseData = new LinkedHashMap<>();
-for (Map.Entry entry : 
fetchData.entrySet()) {
-responseData.put(entry.getKey(), 
FetchResponse.partitionResponse(entry.getKey().partition(), error));
+List topicResponseList = new 
ArrayList<>();
+// Since UNKNOWN_TOPIC_ID is a new error type only returned when topic 
ID requests are made (from newer clients),
+// we can skip returning the error on all partitions and returning any 
partitions at all.
+if (error != Errors.UNKNOWN_TOPIC_ID) {

Review comment:
   Yeah. I agree it is a bit weird. We can update as you mentioned.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-28 Thread GitBox


junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660211602



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -471,16 +512,26 @@ class IncrementalFetchContext(private val time: Time,
   if (session.epoch != expectedEpoch) {
 info(s"Incremental fetch session ${session.id} expected epoch 
$expectedEpoch, but " +
   s"got ${session.epoch}.  Possible duplicate request.")
-FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP)
+FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP, Collections.emptyMap())
   } else {
+var topLevelError = Errors.NONE
 // Iterate over the update list using PartitionIterator. This will 
prune updates which don't need to be sent
+// It will also set the top-level error to INCONSISTENT_TOPIC_ID if 
any partitions had this error.
 val partitionIter = new PartitionIterator(updates.entrySet.iterator, 
true)
 while (partitionIter.hasNext) {
-  partitionIter.next()
+  val entry = partitionIter.next()
+  if (entry.getValue.errorCode() == 
Errors.INCONSISTENT_TOPIC_ID.code()) {

Review comment:
   > I'm still not sure I follow "pending fetch request could still 
reference the outdated Partition object and therefore miss the topicId change" 
My understanding is that the log is the source of truth and we will either read 
from the log if it matches and not read if it doesn't. I see we could get an 
error erroneously if the partition didn't update in time, but I don't see us 
being able to read from the log due to a stale partition.
   > 
   > Or are you referring to the getPartitionOrException(tp) call picking up a 
stale partition and both the request and the partition are stale? In this case, 
we will read from the log, but will identify it with its correct ID. The client 
will handle based on this.
   
   A fetch request may pass the topicId check in ReplicaManager and is about to 
call log.read(), when the topicId changes. I was wondering in that case, if 
log.read() could return data that corresponds to the old topicId. It seems 
that's not possible since Log.close() closes all segments.

##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -354,38 +377,55 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param cache  The fetch session cache.
   * @param reqMetadataThe request metadata.
   * @param fetchData  The partition data from the fetch request.
+  * @param usesTopicIds   True if this session should use topic IDs.
+  * @param topicIds   The map from topic names to topic IDs.
   * @param isFromFollower True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
private val cache: FetchSessionCache,
private val reqMetadata: JFetchMetadata,
private val fetchData: util.Map[TopicPartition, 
FetchRequest.PartitionData],
+   private val usesTopicIds: Boolean,
+   private val topicIds: util.Map[String, Uuid],
private val isFromFollower: Boolean) extends 
FetchContext {
   override def getFetchOffset(part: TopicPartition): Option[Long] =
 Option(fetchData.get(part)).map(_.fetchOffset)
 
-  override def foreachPartition(fun: (TopicPartition, 
FetchRequest.PartitionData) => Unit): Unit = {
-fetchData.forEach(fun(_, _))
+  override def foreachPartition(fun: (TopicPartition, Uuid, 
FetchRequest.PartitionData) => Unit): Unit = {
+fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: 
Short): Int = {
-FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
+FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): 
FetchResponse = {
-def createNewSession: FetchSession.CACHE_MAP = {
+var topLevelError = Errors.NONE
+def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) 
= {
   val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
+  val sessionTopicIds = new util.HashMap[String, Uuid](updates.size)
   updates.forEach { (part, respData) =>
+if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) {
+  info(s"Session encountered an inconsistent topic ID for 
topicPartition $part.")
+  topLevelError = Errors.INCONSISTENT_TOPIC_ID
+}
 val reqData = fetchData.get(part)
-cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData))
+val id = topicIds.getOrDefault(part.topic(), 

[GitHub] [kafka] jacky1193610322 commented on a change in pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)

2021-06-28 Thread GitBox


jacky1193610322 commented on a change in pull request #10931:
URL: https://github.com/apache/kafka/pull/10931#discussion_r660204460



##
File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
##
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+
+/**
+ * Represents changes to the topics in the metadata image.
+ */
+public final class TopicsDelta {
+private final TopicsImage image;
+
+/**
+ * A map from topic IDs to the topic deltas for each topic. Topics which 
have been
+ * deleted will not appear in this map.
+ */
+private final Map changedTopics = new HashMap<>();
+
+/**
+ * The IDs of topics that exist in the image but that have been deleted. 
Note that if
+ * a topic does not exist in the image, it will also not exist in this 
set. Topics
+ * that are created and then deleted within the same delta will leave no 
trace.
+ */
+private final Set deletedTopicIds = new HashSet<>();

Review comment:
   I'm sorry too, I have no question if we always replay each record once 
and exactly once even if the broker crash.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot

2021-06-28 Thread GitBox


jsancio commented on a change in pull request #10899:
URL: https://github.com/apache/kafka/pull/10899#discussion_r660209060



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##
@@ -654,14 +657,69 @@ private static void writeLeaderChangeMessage(ByteBuffer 
buffer,
  long timestamp,
  int leaderEpoch,
  LeaderChangeMessage 
leaderChangeMessage) {
+try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+TimestampType.CREATE_TIME, initialOffset, timestamp,
+RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
+false, true, leaderEpoch, buffer.capacity())
+) {
+builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
+}
+}
+
+public static MemoryRecords withSnapshotHeaderRecord(
+long initialOffset,
+long timestamp,
+int leaderEpoch,
+ByteBuffer buffer,
+SnapshotHeaderRecord snapshotHeaderRecord
+) {
+writeSnapshotHeaderRecord(buffer, initialOffset, timestamp, 
leaderEpoch, snapshotHeaderRecord);
+buffer.flip();
+return MemoryRecords.readableRecords(buffer);
+}
+
+private static void writeSnapshotHeaderRecord(ByteBuffer buffer,
+long initialOffset,
+long timestamp,
+int leaderEpoch,
+SnapshotHeaderRecord snapshotHeaderRecord
+) {
+try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+TimestampType.CREATE_TIME, initialOffset, timestamp,
+RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
+false, true, leaderEpoch, buffer.capacity())
+) {
+builder.appendSnapshotHeaderMessage(timestamp, 
snapshotHeaderRecord);
+}
+}
+
+public static MemoryRecords withSnapshotFooterRecord(
+long initialOffset,
+long timestamp,
+int leaderEpoch,
+ByteBuffer buffer,
+SnapshotFooterRecord snapshotFooterRecord
+) {
+writeSnapshotFooterRecord(buffer, initialOffset, timestamp, 
leaderEpoch, snapshotFooterRecord);
+buffer.flip();
+return MemoryRecords.readableRecords(buffer);
+}
+
+private static void writeSnapshotFooterRecord(ByteBuffer buffer,
+long initialOffset,
+long timestamp,
+int leaderEpoch,
+SnapshotFooterRecord snapshotFooterRecord
+) {
 MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
 buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
 TimestampType.CREATE_TIME, initialOffset, timestamp,
 RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
 false, true, leaderEpoch, buffer.capacity()
 );
-builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
+builder.appendSnapshotFooterMessage(timestamp, snapshotFooterRecord);
 builder.close();

Review comment:
   We should use Java's try-with-resources here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12631) Support api to resign raft leadership

2021-06-28 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12631.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

> Support api to resign raft leadership
> -
>
> Key: KAFKA-12631
> URL: https://issues.apache.org/jira/browse/KAFKA-12631
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.0
>
>
> It is useful to allow the controller to explicitly resign after encountering 
> an error of some kind. The Raft state machine implements a Resigned state, 
> but it is only currently used during graceful shutdown.
> This work depends on both of the following jiras:
> - KAFKA-12342: Adds resign() api after merging MetaLogManager interface
> - KAFKA-12607: Adds support for granting votes while in the Resigned state



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`

2021-06-28 Thread GitBox


hachikuji merged pull request #10913:
URL: https://github.com/apache/kafka/pull/10913


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jacky1193610322 commented on a change in pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)

2021-06-28 Thread GitBox


jacky1193610322 commented on a change in pull request #10931:
URL: https://github.com/apache/kafka/pull/10931#discussion_r660204460



##
File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
##
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+
+/**
+ * Represents changes to the topics in the metadata image.
+ */
+public final class TopicsDelta {
+private final TopicsImage image;
+
+/**
+ * A map from topic IDs to the topic deltas for each topic. Topics which 
have been
+ * deleted will not appear in this map.
+ */
+private final Map changedTopics = new HashMap<>();
+
+/**
+ * The IDs of topics that exist in the image but that have been deleted. 
Note that if
+ * a topic does not exist in the image, it will also not exist in this 
set. Topics
+ * that are created and then deleted within the same delta will leave no 
trace.
+ */
+private final Set deletedTopicIds = new HashSet<>();

Review comment:
   I'm sorry too, I have no problem if we always replay each record once 
and exactly once even if the broker crash.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot

2021-06-28 Thread GitBox


cmccabe commented on a change in pull request #10899:
URL: https://github.com/apache/kafka/pull/10899#discussion_r660199695



##
File path: 
clients/src/main/resources/common/message/MetadataSnapshotFooterRecord.json
##
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "MetadataSnapshotFooterRecord",

Review comment:
   Perhaps. However, if we needed a specific metadata header or footer, 
we'd do that as a metadata record.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660199209



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -277,14 +277,18 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
 // is essential for them.
 Errors error = Errors.forException(e);
 List topicResponseList = new 
ArrayList<>();
-data.topics().forEach(topic -> {
-List partitionResponses = 
topic.partitions().stream().map(partition ->
-FetchResponse.partitionResponse(partition.partition(), 
error)).collect(Collectors.toList());
-topicResponseList.add(new 
FetchResponseData.FetchableTopicResponse()
-.setTopic(topic.topic())
-.setTopicId(topic.topicId())
-.setPartitions(partitionResponses));
-});
+// Since UNKNOWN_TOPIC_ID is a new error type only returned when topic 
ID requests are made (from newer clients),

Review comment:
   We need to do something like this to easily get the top level error with 
no partition response for UNKNOWN_TOPIC_ID. I think this works, but we may want 
a version check as well just to be safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-28 Thread GitBox


ccding commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r660196562



##
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##
@@ -1472,11 +1489,30 @@ class LogCleanerTest {
 time.scheduler.clear()
 cleanedKeys = LogTestUtils.keysInLog(log)
 
-// 3) Simulate recovery after swap file is created and old segments files 
are renamed
+// 4) Simulate recovery after swap file is created and old segments files 
are renamed
 //to .deleted. Clean operation is resumed during recovery.
 log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
 log = recoverAndCheck(config, cleanedKeys)
 
+// add some more messages and clean the log again
+while (log.numberOfSegments < 10) {
+  log.appendAsLeader(record(log.logEndOffset.toInt, 
log.logEndOffset.toInt), leaderEpoch = 0)
+  messageCount += 1
+}
+for (k <- 1 until messageCount by 2)
+  offsetMap.put(key(k), Long.MaxValue)
+cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, 
new CleanerStats(),
+  new CleanedTransactionMetadata)
+// clear scheduler so that async deletes don't run
+time.scheduler.clear()
+cleanedKeys = LogTestUtils.keysInLog(log)
+
+// 5) Simulate recovery after a subset of swap files are renamed to 
regular files and old segments files are renamed
+//to .deleted. Clean operation is resumed during recovery.
+log.logSegments.head.timeIndex.file.renameTo(new 
File(CoreUtils.replaceSuffix(log.logSegments.head.timeIndex.file.getPath, "", 
Log.SwapFileSuffix)))
+  // .changeFileSuffixes("", Log.SwapFileSuffix)

Review comment:
   My bad. Forgot to delete this line. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r660193316



##
File path: clients/src/main/java/org/apache/kafka/common/TopicCollection.java
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Collection;
+
+/**
+ * A class used to represent a collection of topics. This collection may 
define topics by topic name
+ * or topic ID. Subclassing this class beyond the classes provided here is not 
supported.
+ */
+public abstract class TopicCollection {

Review comment:
   I think it's simpler with just the factories, so I like this idea.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics

2021-06-28 Thread GitBox


hachikuji commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r660179959



##
File path: clients/src/main/java/org/apache/kafka/common/TopicCollection.java
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Collection;
+
+/**
+ * A class used to represent a collection of topics. This collection may 
define topics by topic name
+ * or topic ID. Subclassing this class beyond the classes provided here is not 
supported.
+ */
+public abstract class TopicCollection {

Review comment:
   No strong opinion, but I'd probably vote to keep the constructors 
private. Might be worth getting a second opinion. @ijuma ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics

2021-06-28 Thread GitBox


hachikuji commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r660178840



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##
@@ -30,24 +31,62 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-final Map> futures;
+private Map> nameFutures;
+private Map> topicIdFutures;
 
-protected DeleteTopicsResult(Map> futures) {
-this.futures = futures;
+protected DeleteTopicsResult() {}

Review comment:
   Yeah, right. As long as the constructor is private, you can ensure one 
of them must be null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12975) Consider how Topic IDs can improve consume experience

2021-06-28 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17370944#comment-17370944
 ] 

Jason Gustafson commented on KAFKA-12975:
-

I think the main guarantee we want is that offsets cannot be mistakenly reused 
on a recreated topic. Once a topic is gone, all its offsets (persistent or 
otherwise) need to go with it. Ideally the consumer would get a notification in 
the rebalance listener that the topic was recreated so that the user has a 
chance to set the initial position. As a starting place, we can associate the 
in-memory fetch position in the consumer with the topicId that is being 
fetched. Then if we find the topic ID is deleted, we should delete the position 
in memory and consult the reset policy.

> Consider how Topic IDs can improve consume experience
> -
>
> Key: KAFKA-12975
> URL: https://issues.apache.org/jira/browse/KAFKA-12975
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Priority: Major
>
> Currently when a consumer subscribes to a topic, it will continue to consume 
> from this topic across topic deletions and recreations with the same name. By 
> adding topic IDs to the consumer, we will have more insight for these events. 
> We should figure out if we want to change consumer logic now that we have 
> this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

2021-06-28 Thread GitBox


IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-870107728


   Thanks @dajac for review and comments!
   Updated the PR taking them into account. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12975) Consider how Topic IDs can improve consume experience

2021-06-28 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12975:

Parent: KAFKA-8872
Issue Type: Sub-task  (was: Improvement)

> Consider how Topic IDs can improve consume experience
> -
>
> Key: KAFKA-12975
> URL: https://issues.apache.org/jira/browse/KAFKA-12975
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Priority: Major
>
> Currently when a consumer subscribes to a topic, it will continue to consume 
> from this topic across topic deletions and recreations with the same name. By 
> adding topic IDs to the consumer, we will have more insight for these events. 
> We should figure out if we want to change consumer logic now that we have 
> this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-28 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r660173021



##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+// Some topic's partitions gets valid OffsetAndMetada values, other gets 
nulls values (negative integers) and others aren't defined
+val commitedOffsets = Map(
+  testTopicPartition1 -> new OffsetAndMetadata(100),
+  testTopicPartition2 -> null,
+  testTopicPartition3 -> new OffsetAndMetadata(100),
+  testTopicPartition4 -> new OffsetAndMetadata(100),
+  testTopicPartition5 -> null,
+).asJava
+
+val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+val endOffsets = Map(
+  testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+)
+val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2)
+val unassignedTopicPartitions = Set(testTopicPartition3, 
testTopicPartition4, testTopicPartition5)
+
+val consumerGroupDescription = new ConsumerGroupDescription(group,
+  true,
+  Collections.singleton(new MemberDescription("member1", 
Optional.of("instance1"), "client1", "host1", new 
MemberAssignment(assignedTopicPartitions.asJava))),
+  classOf[RangeAssignor].getName,
+  ConsumerGroupState.STABLE,
+  new Node(1, "localhost", 9092))
+
+def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): 
ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+  topicPartitionOffsets => topicPartitionOffsets != null && 
topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+}
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+  .thenReturn(new 
DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(consumerGroupDescription
+when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+  
.thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+when(admin.listOffsets(
+  ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+  any()
+)).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.asJava))

Review comment:
   Sure, will add  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-28 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r660172970



##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+// Some topic's partitions gets valid OffsetAndMetada values, other gets 
nulls values (negative integers) and others aren't defined
+val commitedOffsets = Map(
+  testTopicPartition1 -> new OffsetAndMetadata(100),
+  testTopicPartition2 -> null,
+  testTopicPartition3 -> new OffsetAndMetadata(100),
+  testTopicPartition4 -> new OffsetAndMetadata(100),
+  testTopicPartition5 -> null,
+).asJava
+
+val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+val endOffsets = Map(
+  testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+)
+val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2)
+val unassignedTopicPartitions = Set(testTopicPartition3, 
testTopicPartition4, testTopicPartition5)
+
+val consumerGroupDescription = new ConsumerGroupDescription(group,
+  true,
+  Collections.singleton(new MemberDescription("member1", 
Optional.of("instance1"), "client1", "host1", new 
MemberAssignment(assignedTopicPartitions.asJava))),
+  classOf[RangeAssignor].getName,
+  ConsumerGroupState.STABLE,
+  new Node(1, "localhost", 9092))
+
+def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): 
ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+  topicPartitionOffsets => topicPartitionOffsets != null && 
topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+}
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+  .thenReturn(new 
DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(consumerGroupDescription
+when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+  
.thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+when(admin.listOffsets(
+  ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+  any()
+)).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.asJava))
+when(admin.listOffsets(
+  ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
+  any()
+)).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => 
unassignedTopicPartitions.contains(tp) }.asJava))

Review comment:
    




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-28 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r660172772



##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+// Some topic's partitions gets valid OffsetAndMetada values, other gets 
nulls values (negative integers) and others aren't defined
+val commitedOffsets = Map(
+  testTopicPartition1 -> new OffsetAndMetadata(100),
+  testTopicPartition2 -> null,
+  testTopicPartition3 -> new OffsetAndMetadata(100),
+  testTopicPartition4 -> new OffsetAndMetadata(100),
+  testTopicPartition5 -> null,
+).asJava
+
+val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+val endOffsets = Map(
+  testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+)
+val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2)
+val unassignedTopicPartitions = Set(testTopicPartition3, 
testTopicPartition4, testTopicPartition5)
+
+val consumerGroupDescription = new ConsumerGroupDescription(group,
+  true,
+  Collections.singleton(new MemberDescription("member1", 
Optional.of("instance1"), "client1", "host1", new 
MemberAssignment(assignedTopicPartitions.asJava))),
+  classOf[RangeAssignor].getName,
+  ConsumerGroupState.STABLE,
+  new Node(1, "localhost", 9092))
+
+def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): 
ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+  topicPartitionOffsets => topicPartitionOffsets != null && 
topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+}
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+  .thenReturn(new 
DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(consumerGroupDescription
+when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+  
.thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+when(admin.listOffsets(
+  ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+  any()
+)).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.asJava))
+when(admin.listOffsets(
+  ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
+  any()
+)).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => 
unassignedTopicPartitions.contains(tp) }.asJava))
+
+val (state, assignments) = groupService.collectGroupOffsets(group)
+val returnedOffsets = assignments.map { results =>
+  results.map { assignment =>
+new TopicPartition(assignment.topic.get, assignment.partition.get) -> 
assignment.offset
+  }.toMap
+}.getOrElse(Map.empty)
+// Results should have information for all assigned topic partition (even 
if there is not Offset's information at all, because they get fills with None)
+// Results should have information only for unassigned topic partitions if 
and only if there is information about them (including with null values)

Review comment:
   You are right, it's not relevant because all partitions have information 
now. Going to remove




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12975) Consider how Topic IDs can improve consume experience

2021-06-28 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-12975:
--

Assignee: (was: Justine Olshan)

> Consider how Topic IDs can improve consume experience
> -
>
> Key: KAFKA-12975
> URL: https://issues.apache.org/jira/browse/KAFKA-12975
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Justine Olshan
>Priority: Major
>
> Currently when a consumer subscribes to a topic, it will continue to consume 
> from this topic across topic deletions and recreations with the same name. By 
> adding topic IDs to the consumer, we will have more insight for these events. 
> We should figure out if we want to change consumer logic now that we have 
> this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-28 Thread GitBox


junrao commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r660164974



##
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##
@@ -1472,11 +1489,30 @@ class LogCleanerTest {
 time.scheduler.clear()
 cleanedKeys = LogTestUtils.keysInLog(log)
 
-// 3) Simulate recovery after swap file is created and old segments files 
are renamed
+// 4) Simulate recovery after swap file is created and old segments files 
are renamed
 //to .deleted. Clean operation is resumed during recovery.
 log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
 log = recoverAndCheck(config, cleanedKeys)
 
+// add some more messages and clean the log again
+while (log.numberOfSegments < 10) {
+  log.appendAsLeader(record(log.logEndOffset.toInt, 
log.logEndOffset.toInt), leaderEpoch = 0)
+  messageCount += 1
+}
+for (k <- 1 until messageCount by 2)
+  offsetMap.put(key(k), Long.MaxValue)
+cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, 
new CleanerStats(),
+  new CleanedTransactionMetadata)
+// clear scheduler so that async deletes don't run
+time.scheduler.clear()
+cleanedKeys = LogTestUtils.keysInLog(log)
+
+// 5) Simulate recovery after a subset of swap files are renamed to 
regular files and old segments files are renamed
+//to .deleted. Clean operation is resumed during recovery.
+log.logSegments.head.timeIndex.file.renameTo(new 
File(CoreUtils.replaceSuffix(log.logSegments.head.timeIndex.file.getPath, "", 
Log.SwapFileSuffix)))
+  // .changeFileSuffixes("", Log.SwapFileSuffix)

Review comment:
   Is this still needed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r660165327



##
File path: clients/src/main/java/org/apache/kafka/common/TopicCollection.java
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Collection;
+
+/**
+ * A class used to represent a collection of topics. This collection may 
define topics by topic name
+ * or topic ID. Subclassing this class beyond the classes provided here is not 
supported.
+ */
+public abstract class TopicCollection {

Review comment:
   Static factories in addition to a public constructor? Or make the 
constructor private?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r660164967



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1688,15 +1691,27 @@ void handleFailure(Throwable throwable) {
 }
 
 @Override
-public DeleteTopicsResult deleteTopics(final Collection topicNames,
+public DeleteTopicsResult deleteTopics(final TopicCollection topics,
final DeleteTopicsOptions options) {
+DeleteTopicsResult result;
+if (topics instanceof TopicIdCollection)
+result = DeleteTopicsResult.ofTopicIds(new 
HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), 
options)));
+else if (topics instanceof TopicNameCollection)
+result = DeleteTopicsResult.ofTopicNames(new 
HashMap<>(handleDeleteTopicsUsingNames(((TopicNameCollection) 
topics).topicNames(), options)));
+else
+throw new UnsupportedOperationException("The TopicCollection 
provided did not match any supported classes for deleteTopics.");

Review comment:
   I was trying to remember the name of that error. I agree.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r660164864



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##
@@ -30,24 +31,62 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-final Map> futures;
+private Map> nameFutures;
+private Map> topicIdFutures;
 
-protected DeleteTopicsResult(Map> futures) {
-this.futures = futures;
+protected DeleteTopicsResult() {}
+
+private void setNameFutures(Map> nameFutures) {
+this.nameFutures = nameFutures;
+}
+
+private void setTopicIdFutures(Map> 
topicIdFutures) {
+this.topicIdFutures = topicIdFutures;
+}
+
+protected static DeleteTopicsResult ofTopicNames(Map> nameFutures) {
+DeleteTopicsResult result = new DeleteTopicsResult();
+result.setNameFutures(nameFutures);
+return result;
+}
+
+protected static DeleteTopicsResult ofTopicIds(Map> topicIdFutures) {
+DeleteTopicsResult result = new DeleteTopicsResult();
+result.setTopicIdFutures(topicIdFutures);
+return result;
+}
+
+/**
+ * Return a map from topic names to futures which can be used to check the 
status of
+ * individual deletions if the deleteTopics request used topic names. 
Otherwise return null.

Review comment:
   I was following the convention from this class before, but I can update.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r660164712



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##
@@ -30,24 +31,62 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-final Map> futures;
+private Map> nameFutures;
+private Map> topicIdFutures;
 
-protected DeleteTopicsResult(Map> futures) {
-this.futures = futures;
+protected DeleteTopicsResult() {}

Review comment:
   So the idea is that we have two parameters and set one as null in the 
static methods?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics

2021-06-28 Thread GitBox


hachikuji commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r660160050



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##
@@ -30,24 +31,62 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-final Map> futures;
+private Map> nameFutures;
+private Map> topicIdFutures;
 
-protected DeleteTopicsResult(Map> futures) {
-this.futures = futures;
+protected DeleteTopicsResult() {}
+
+private void setNameFutures(Map> nameFutures) {
+this.nameFutures = nameFutures;
+}
+
+private void setTopicIdFutures(Map> 
topicIdFutures) {
+this.topicIdFutures = topicIdFutures;
+}
+
+protected static DeleteTopicsResult ofTopicNames(Map> nameFutures) {
+DeleteTopicsResult result = new DeleteTopicsResult();
+result.setNameFutures(nameFutures);
+return result;
+}
+
+protected static DeleteTopicsResult ofTopicIds(Map> topicIdFutures) {
+DeleteTopicsResult result = new DeleteTopicsResult();
+result.setTopicIdFutures(topicIdFutures);
+return result;
+}
+
+/**
+ * Return a map from topic names to futures which can be used to check the 
status of
+ * individual deletions if the deleteTopics request used topic names. 
Otherwise return null.
+ */
+public Map> topicNameValues() {
+return nameFutures;
+}
+

Review comment:
   nit: extra newline

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##
@@ -30,24 +31,62 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-final Map> futures;
+private Map> nameFutures;
+private Map> topicIdFutures;
 
-protected DeleteTopicsResult(Map> futures) {
-this.futures = futures;
+protected DeleteTopicsResult() {}

Review comment:
   Could this be private? I think it would be a little cleaner to take 
`nameFutures` and `topicIdFutures` in the constructor. That would allow us to 
mark those fields as final.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##
@@ -30,24 +31,62 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-final Map> futures;
+private Map> nameFutures;
+private Map> topicIdFutures;
 
-protected DeleteTopicsResult(Map> futures) {
-this.futures = futures;
+protected DeleteTopicsResult() {}
+
+private void setNameFutures(Map> nameFutures) {
+this.nameFutures = nameFutures;
+}
+
+private void setTopicIdFutures(Map> 
topicIdFutures) {
+this.topicIdFutures = topicIdFutures;
+}
+
+protected static DeleteTopicsResult ofTopicNames(Map> nameFutures) {
+DeleteTopicsResult result = new DeleteTopicsResult();
+result.setNameFutures(nameFutures);
+return result;
+}
+
+protected static DeleteTopicsResult ofTopicIds(Map> topicIdFutures) {
+DeleteTopicsResult result = new DeleteTopicsResult();
+result.setTopicIdFutures(topicIdFutures);
+return result;
+}
+
+/**
+ * Return a map from topic names to futures which can be used to check the 
status of
+ * individual deletions if the deleteTopics request used topic names. 
Otherwise return null.

Review comment:
   nit: can we use javadoc `@return`?

##
File path: clients/src/main/java/org/apache/kafka/common/TopicCollection.java
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Collection;
+
+/**
+ * A class used to represent a collection of topics. This collection may 
define topics by topic name
+ * or topic ID. Subclassing this class beyond the classes provided here is not 
supported.
+ */
+public abstract class TopicCollection {
+
+private TopicCollection() {}
+
+/**
+ * A class used to represent a collection of topics defined by their topic 
ID.
+ * Subclassing this class beyond the classes provided here is not 
supported.
+ */
+public static class TopicIdCollection 

[GitHub] [kafka] mjsax commented on pull request #10824: KAFKA-12718: SessionWindows are closed too early

2021-06-28 Thread GitBox


mjsax commented on pull request #10824:
URL: https://github.com/apache/kafka/pull/10824#issuecomment-870094158


   Thanks for your PR! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10824: KAFKA-12718: SessionWindows are closed too early

2021-06-28 Thread GitBox


mjsax merged pull request #10824:
URL: https://github.com/apache/kafka/pull/10824


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10893: KAFKA-12909: add missing tests

2021-06-28 Thread GitBox


mjsax merged pull request #10893:
URL: https://github.com/apache/kafka/pull/10893


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10894: KAFKA-12951: restore must terminate for tx global topic

2021-06-28 Thread GitBox


mjsax commented on pull request #10894:
URL: https://github.com/apache/kafka/pull/10894#issuecomment-870089809


   Merged to `trunk` and cherry-picked to `2.8` and `2.7` branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10930: KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs

2021-06-28 Thread GitBox


guozhangwang commented on pull request #10930:
URL: https://github.com/apache/kafka/pull/10930#issuecomment-870078185


   > @guozhangwang With Errors.NONE, we throw OffsetOutOfRangeException in the 
follower when attempting to update follower's start offset based on the 
leader's start offset returned in the response:
   
   I see, I thought you meant there are some conditions on the follower's side 
that still can protect us from not capturing this error. Now that I realized 
this condition may or may not hit really, but in either case it's bad:
   
   1. If it is not hit, we would ended up not capturing this error and proceed 
as if nothing went wrong.
   2. If it is hit, we throw OOO to capture, on follower's side, but also moved 
the partition to failed state and we would not be able to recover from that 
state.
   
   If my understanding here is correct, I think I can go ahead and merge the 
PR. BTW could you re-trigger the unit tests?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13005) Support JBOD in kraft mode

2021-06-28 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-13005:
-
Labels: kip-500  (was: )

> Support JBOD in kraft mode
> --
>
> Key: KAFKA-13005
> URL: https://issues.apache.org/jira/browse/KAFKA-13005
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>  Labels: kip-500
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked t…

2021-06-28 Thread GitBox


jsancio commented on a change in pull request #10932:
URL: https://github.com/apache/kafka/pull/10932#discussion_r660142781



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -157,13 +162,27 @@ public synchronized void 
handleSnapshot(SnapshotReader reader) {
 public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) {
 if (newLeader.isLeader(nodeId)) {
 log.debug("Counter uncommitted value initialized to {} after 
claiming leadership in epoch {}",
-committed, newLeader);
+committed, newLeader);
 uncommitted = committed;
 claimedEpoch = OptionalInt.of(newLeader.epoch());
 } else {
 log.debug("Counter uncommitted value reset after resigning 
leadership");
 uncommitted = -1;
 claimedEpoch = OptionalInt.empty();
 }
+handleSnapshotCalled = false;
+handleSnapshotCalls = 0;
+}
+
+public boolean isLeader() {
+return this.client.leaderAndEpoch().isLeader(nodeId);

Review comment:
   https://issues.apache.org/jira/browse/KAFKA-13006




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13006) Remove the method RaftClient.leaderAndEpoch

2021-06-28 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13006:
--

 Summary: Remove the method RaftClient.leaderAndEpoch
 Key: KAFKA-13006
 URL: https://issues.apache.org/jira/browse/KAFKA-13006
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jose Armando Garcia Sancio


The are semantic differences between {{RaftClient.leaderAndEpoch}} and 
{{RaftClient.Listener.handleLeaderChange}} specially when the raft client 
transition from follower to leader. To simplify the API, I think that we should 
remove the method {{RaftClient.leaderAndEpoch}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)

2021-06-28 Thread GitBox


cmccabe commented on a change in pull request #10931:
URL: https://github.com/apache/kafka/pull/10931#discussion_r660141890



##
File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
##
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+
+/**
+ * Represents changes to the topics in the metadata image.
+ */
+public final class TopicsDelta {
+private final TopicsImage image;
+
+/**
+ * A map from topic IDs to the topic deltas for each topic. Topics which 
have been
+ * deleted will not appear in this map.
+ */
+private final Map changedTopics = new HashMap<>();
+
+/**
+ * The IDs of topics that exist in the image but that have been deleted. 
Note that if
+ * a topic does not exist in the image, it will also not exist in this 
set. Topics
+ * that are created and then deleted within the same delta will leave no 
trace.
+ */
+private final Set deletedTopicIds = new HashSet<>();

Review comment:
   Sorry, I can't quite follow the question. It is certainly the case that 
we only replay each record once and exactly once, though, if that's the 
question?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked t…

2021-06-28 Thread GitBox


jsancio commented on a change in pull request #10932:
URL: https://github.com/apache/kafka/pull/10932#discussion_r660140438



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -38,6 +38,9 @@
 private OptionalInt claimedEpoch = OptionalInt.empty();
 private long lastOffsetSnapshotted = -1;
 
+private int handleSnapshotCalls = 0;
+private boolean handleSnapshotCalled = false;

Review comment:
   We should only store `handleSnapshotCalls` since `handleSnapshotCalled` 
is always equal to `handleSnapshotCalls > 0`.

##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -157,13 +162,27 @@ public synchronized void 
handleSnapshot(SnapshotReader reader) {
 public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) {
 if (newLeader.isLeader(nodeId)) {
 log.debug("Counter uncommitted value initialized to {} after 
claiming leadership in epoch {}",
-committed, newLeader);
+committed, newLeader);
 uncommitted = committed;
 claimedEpoch = OptionalInt.of(newLeader.epoch());
 } else {
 log.debug("Counter uncommitted value reset after resigning 
leadership");
 uncommitted = -1;
 claimedEpoch = OptionalInt.empty();
 }
+handleSnapshotCalled = false;
+handleSnapshotCalls = 0;
+}
+
+public boolean isLeader() {
+return this.client.leaderAndEpoch().isLeader(nodeId);

Review comment:
   I think this is the issue you reported in the Jira. The 
`RaftClient.Listener` should not use `RaftClient.leaderAndEpoch` to determine 
if it is the leader. It should instead use 
`RaftClient.Listener.handleLeaderChange`.
   
   For this state machine `ReplicatedCounter` we should look at the 
`claimedEpoch` variable.
   
   I am going to create an issue to remove this method. cc @hachikuji 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13005) Support JBOD in kraft mode

2021-06-28 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-13005:


 Summary: Support JBOD in kraft mode
 Key: KAFKA-13005
 URL: https://issues.apache.org/jira/browse/KAFKA-13005
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] niket-goel commented on a change in pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot

2021-06-28 Thread GitBox


niket-goel commented on a change in pull request #10899:
URL: https://github.com/apache/kafka/pull/10899#discussion_r660135431



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -204,35 +207,103 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
-public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, long currentTimeMs) {
+private void appendControlMessage(
+Supplier supplier,
+ByteBuffer buffer
+) {
 appendLock.lock();
 try {
 forceDrain();
-ByteBuffer buffer = memoryPool.tryAllocate(256);
-if (buffer != null) {
-MemoryRecords data = MemoryRecords.withLeaderChangeMessage(
-this.nextOffset, 
-currentTimeMs, 
-this.epoch, 
-buffer, 
-leaderChangeMessage
-);
-completed.add(new CompletedBatch<>(
-nextOffset,
-1,
-data,
-memoryPool,
-buffer
-));
-nextOffset += 1;
-} else {
-throw new IllegalStateException("Could not allocate buffer for 
the leader change record.");
-}
+completed.add(new CompletedBatch<>(
+nextOffset,
+1,
+supplier.get(),
+memoryPool,
+buffer
+));
+nextOffset += 1;
 } finally {
 appendLock.unlock();
 }
 }
 
+/**
+ * Append a {@link LeaderChangeMessage} record to the batch
+ *
+ * @param @LeaderChangeMessage The message to append
+ * @param @currentTimeMs The timestamp of message generation
+ * @throws IllegalStateException on failure to allocate a buffer for the 
record
+ */
+public void appendLeaderChangeMessage(
+LeaderChangeMessage leaderChangeMessage,
+long currentTimeMs
+) {
+ByteBuffer buffer = memoryPool.tryAllocate(256);

Review comment:
   The code organization gets funny if we do that because the byte buffer 
is needed for the MemoryRecord API as well. Will sync with you offline on this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niket-goel commented on a change in pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot

2021-06-28 Thread GitBox


niket-goel commented on a change in pull request #10899:
URL: https://github.com/apache/kafka/pull/10899#discussion_r660134385



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -204,35 +207,103 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
-public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, long currentTimeMs) {
+private void appendControlMessage(
+Supplier supplier,
+ByteBuffer buffer
+) {
 appendLock.lock();
 try {
 forceDrain();
-ByteBuffer buffer = memoryPool.tryAllocate(256);
-if (buffer != null) {
-MemoryRecords data = MemoryRecords.withLeaderChangeMessage(
-this.nextOffset, 
-currentTimeMs, 
-this.epoch, 
-buffer, 
-leaderChangeMessage
-);
-completed.add(new CompletedBatch<>(
-nextOffset,
-1,
-data,
-memoryPool,
-buffer
-));
-nextOffset += 1;
-} else {
-throw new IllegalStateException("Could not allocate buffer for 
the leader change record.");
-}
+completed.add(new CompletedBatch<>(
+nextOffset,
+1,
+supplier.get(),
+memoryPool,
+buffer
+));
+nextOffset += 1;
 } finally {
 appendLock.unlock();
 }
 }
 
+/**
+ * Append a {@link LeaderChangeMessage} record to the batch
+ *
+ * @param @LeaderChangeMessage The message to append
+ * @param @currentTimeMs The timestamp of message generation
+ * @throws IllegalStateException on failure to allocate a buffer for the 
record
+ */
+public void appendLeaderChangeMessage(
+LeaderChangeMessage leaderChangeMessage,
+long currentTimeMs
+) {
+ByteBuffer buffer = memoryPool.tryAllocate(256);
+if (buffer != null) {
+appendControlMessage(
+() -> MemoryRecords.withLeaderChangeMessage(
+this.nextOffset,
+currentTimeMs,
+this.epoch,
+buffer,
+leaderChangeMessage),
+buffer);
+} else {
+throw new IllegalStateException("Could not allocate buffer for the 
leader change record.");
+}
+}
+
+
+/**
+ * Append a {@link SnapshotHeaderRecord} record to the batch
+ *
+ * @param @SnapshotHeaderRecord The message to append
+ * @throws IllegalStateException on failure to allocate a buffer for the 
record
+ */
+public void appendSnapshotHeaderMessage(
+SnapshotHeaderRecord snapshotHeaderRecord,
+long currentTimeMs
+) {
+ByteBuffer buffer = memoryPool.tryAllocate(256);
+if (buffer != null) {
+appendControlMessage(
+() -> MemoryRecords.withSnapshotHeaderRecord(
+this.nextOffset,
+currentTimeMs,
+this.epoch,
+buffer,
+snapshotHeaderRecord),
+buffer);
+} else {
+throw new IllegalStateException("Could not allocate buffer for the 
metadata snapshot header record.");
+}
+}
+
+/**
+ * Append a {@link SnapshotFooterRecord} record to the batch
+ *
+ * @param @SnapshotFooterRecord The message to append
+ * @throws IllegalStateException on failure to allocate a buffer for the 
record
+ */
+public void appendSnapshotFooterMessage(
+SnapshotFooterRecord snapshotFooterRecord

Review comment:
   Will do

##
File path: 
raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##
@@ -151,6 +194,57 @@ public void testAppendToFrozenSnapshot() throws Exception {
 );
 }
 
+private int validateDelimiters(
+RawSnapshotReader snapshot,
+long lastContainedLogTime
+) {
+assertNotEquals(0, snapshot.sizeInBytes());
+
+int countRecords = 0;
+
+Iterator recordBatches = 
Utils.covariantCast(snapshot.records().batchIterator());
+
+assertEquals(Boolean.TRUE, recordBatches.hasNext());

Review comment:
   thanks

##
File path: 
raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##
@@ -22,25 +22,65 @@
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Random;
+import java.util.Iterator;
 import java.util.Set;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
 import 

[GitHub] [kafka] niket-goel commented on a change in pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot

2021-06-28 Thread GitBox


niket-goel commented on a change in pull request #10899:
URL: https://github.com/apache/kafka/pull/10899#discussion_r660133212



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
##
@@ -26,19 +28,49 @@
  */
 public class ControlRecordUtils {
 
-public static final short LEADER_CHANGE_SCHEMA_VERSION = new 
LeaderChangeMessage().highestSupportedVersion();
+public static final short LEADER_CHANGE_SCHEMA_HIGHEST_VERSION = new 
LeaderChangeMessage().highestSupportedVersion();
+public static final short SNAPSHOT_HEADER_HIGHEST_VERSION = new 
SnapshotHeaderRecord().highestSupportedVersion();
+public static final short SNAPSHOT_FOOTER_HIGHEST_VERSION = new 
SnapshotFooterRecord().highestSupportedVersion();
 
 public static LeaderChangeMessage deserializeLeaderChangeMessage(Record 
record) {
 ControlRecordType recordType = ControlRecordType.parse(record.key());
 if (recordType != ControlRecordType.LEADER_CHANGE) {
 throw new IllegalArgumentException(
-"Expected LEADER_CHANGE control record type(3), but found " + 
recordType.toString());
+"Expected LEADER_CHANGE control record type(2), but found " + 
recordType.toString());
 }
 return deserializeLeaderChangeMessage(record.value().duplicate());
 }
 
 public static LeaderChangeMessage 
deserializeLeaderChangeMessage(ByteBuffer data) {

Review comment:
   Will cut a JIRA.

##
File path: clients/src/main/resources/common/message/SnapshotHeaderRecord.json
##
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "SnapshotHeaderRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{"name": "Version", "type": "int16", "versions": "0+",
+  "about": "The version of the snapshot header record"},
+{"name": "LastContainedLogTime", "type": "int64", "versions": "0+",
+  "about": "The append time of the highest record contained in this 
snapshot"}

Review comment:
   That sounds better. Will use this.

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -345,7 +345,8 @@ void createSnapshotGenerator(long committedOffset, int 
committedEpoch) {
 }
 Optional> writer = 
raftClient.createSnapshot(
 committedOffset,
-committedEpoch
+committedEpoch,
+0/*KAFKA-12997*/

Review comment:
   ack.

##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -106,7 +106,7 @@ public synchronized void handleCommit(BatchReader 
reader) {
 lastCommittedEpoch,
 lastOffsetSnapshotted
 );
-Optional> snapshot = 
client.createSnapshot(lastCommittedOffset, lastCommittedEpoch);
+Optional> snapshot = 
client.createSnapshot(lastCommittedOffset, lastCommittedEpoch, 
0/*KAFKA-12997*/);

Review comment:
   ack




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660127887



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -193,18 +197,22 @@ class CachedPartition(val topic: String,
   * Each fetch session is protected by its own lock, which must be taken 
before mutable
   * fields are read or modified.  This includes modification of the session 
partition map.
   *
-  * @param id   The unique fetch session ID.
-  * @param privileged   True if this session is privileged.  Sessions crated 
by followers
-  * are privileged; sesssion created by consumers are not.
-  * @param partitionMap The CachedPartitionMap.
-  * @param creationMs   The time in milliseconds when this session was created.
-  * @param lastUsedMs   The last used time in milliseconds.  This should only 
be updated by
-  * FetchSessionCache#touch.
-  * @param epochThe fetch session sequence number.
+  * @param id The unique fetch session ID.
+  * @param privileged True if this session is privileged.  Sessions 
crated by followers
+  *   are privileged; session created by consumers are 
not.
+  * @param partitionMap   The CachedPartitionMap.
+  * @param usesTopicIds   True if this session is using topic IDs
+  * @param sessionTopicIdsThe mapping from topic name to topic ID for 
topics in the session.
+  * @param creationMs The time in milliseconds when this session was 
created.
+  * @param lastUsedMs The last used time in milliseconds.  This should 
only be updated by
+  *   FetchSessionCache#touch.
+  * @param epoch  The fetch session sequence number.
   */
 class FetchSession(val id: Int,
val privileged: Boolean,
val partitionMap: FetchSession.CACHE_MAP,
+   val usesTopicIds: Boolean,
+   val sessionTopicIds: FetchSession.TOPIC_ID_MAP,

Review comment:
   Taking a second look, seems like we just use partitionMap.size. Not sure 
if it is useful to have sessionTopicIds size (and if the whole map is too 
much). I'm thinking maybe just including the usesTopicIds boolean.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13004) Trogdor performance decreases sharply with large amounts of tasks.

2021-06-28 Thread Scott Hendricks (Jira)
Scott Hendricks created KAFKA-13004:
---

 Summary: Trogdor performance decreases sharply with large amounts 
of tasks.
 Key: KAFKA-13004
 URL: https://issues.apache.org/jira/browse/KAFKA-13004
 Project: Kafka
  Issue Type: Bug
  Components: tools
 Environment: We run our Trogdor clusters within Kubernetes.
Reporter: Scott Hendricks
Assignee: Scott Hendricks


As part of my performance tests, I am running 3000 workloads within Trogdor.  
The clients seem to be able to handle this fine, but when I go to reset and run 
the same test again, Trogdor seems sluggish.

Here are the steps to reproduce this:
 # Run 3000 workloads in Trogdor, a combination of Produce/Consume workloads.
 # Wait for the workloads to complete.
 # Run the DELETE API calls to destroy all 3000 workloads to reset for the next 
run.
 # Confirm via the API that there are no workloads defined in the system.
 # Run an additional 3000 workloads in Trogdor similar to step 1.

The Coordinator takes a long time to start the second batch of 3000. There 
seems to be some performance issue in the framework that will take a while to 
debug. At this point I don't know if it only affects the Coordinator, or if the 
Agents are affected as well. I do not currently have the time to look into 
this, so I am creating this issue to track it.

The workaround I am employing is destroying and recreating the Trogdor cluster 
in between test runs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660122565



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param time   The clock to use.
   * @param cache  The fetch session cache.
   * @param reqMetadataThe request metadata.
+  * @param versionThe version of the request,
   * @param fetchData  The partition data from the fetch request.
+  * @param topicIds   The map from topic names to topic IDs.
   * @param isFromFollower True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
private val cache: FetchSessionCache,
private val reqMetadata: JFetchMetadata,
+   private val version: Short,
private val fetchData: util.Map[TopicPartition, 
FetchRequest.PartitionData],
+   private val topicIds: util.Map[String, Uuid],
private val isFromFollower: Boolean) extends 
FetchContext {
   override def getFetchOffset(part: TopicPartition): Option[Long] =
 Option(fetchData.get(part)).map(_.fetchOffset)
 
-  override def foreachPartition(fun: (TopicPartition, 
FetchRequest.PartitionData) => Unit): Unit = {
-fetchData.forEach(fun(_, _))
+  override def foreachPartition(fun: (TopicPartition, Uuid, 
FetchRequest.PartitionData) => Unit): Unit = {
+fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: 
Short): Int = {
-FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
+FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): 
FetchResponse = {
-def createNewSession: FetchSession.CACHE_MAP = {
+var error = Errors.NONE
+def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) 
= {
   val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
+  val sessionTopicIds = new util.HashMap[String, Uuid](updates.size)
   updates.forEach { (part, respData) =>
+if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code())
+  error = Errors.INCONSISTENT_TOPIC_ID
 val reqData = fetchData.get(part)
-cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData))
+val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID)
+cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, 
respData))
+if (id != Uuid.ZERO_UUID)
+  sessionTopicIds.put(part.topic, id)
   }
-  cachedPartitions
+  (cachedPartitions, sessionTopicIds)
 }
 val responseSessionId = cache.maybeCreateSession(time.milliseconds(), 
isFromFollower,
-updates.size, () => createNewSession)
+updates.size, version, () => createNewSession)
 debug(s"Full fetch context with session id $responseSessionId returning " +
   s"${partitionsToLogString(updates.keySet)}")
-FetchResponse.of(Errors.NONE, 0, responseSessionId, updates)
+FetchResponse.of(error, 0, responseSessionId, updates, topicIds)

Review comment:
   I guess the only issue with using FetchRequest.getErrorResponse is that 
we may have different topics in the response than in the request. 
SessionErrorContext deals with this by simply having an empty response besides 
the top level error. I'm wondering if we should do something like this. 
(Likewise, with the UNKNOWN_TOPIC_ID error, should we also just send back an 
empty response?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660122565



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param time   The clock to use.
   * @param cache  The fetch session cache.
   * @param reqMetadataThe request metadata.
+  * @param versionThe version of the request,
   * @param fetchData  The partition data from the fetch request.
+  * @param topicIds   The map from topic names to topic IDs.
   * @param isFromFollower True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
private val cache: FetchSessionCache,
private val reqMetadata: JFetchMetadata,
+   private val version: Short,
private val fetchData: util.Map[TopicPartition, 
FetchRequest.PartitionData],
+   private val topicIds: util.Map[String, Uuid],
private val isFromFollower: Boolean) extends 
FetchContext {
   override def getFetchOffset(part: TopicPartition): Option[Long] =
 Option(fetchData.get(part)).map(_.fetchOffset)
 
-  override def foreachPartition(fun: (TopicPartition, 
FetchRequest.PartitionData) => Unit): Unit = {
-fetchData.forEach(fun(_, _))
+  override def foreachPartition(fun: (TopicPartition, Uuid, 
FetchRequest.PartitionData) => Unit): Unit = {
+fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: 
Short): Int = {
-FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
+FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): 
FetchResponse = {
-def createNewSession: FetchSession.CACHE_MAP = {
+var error = Errors.NONE
+def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) 
= {
   val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
+  val sessionTopicIds = new util.HashMap[String, Uuid](updates.size)
   updates.forEach { (part, respData) =>
+if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code())
+  error = Errors.INCONSISTENT_TOPIC_ID
 val reqData = fetchData.get(part)
-cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData))
+val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID)
+cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, 
respData))
+if (id != Uuid.ZERO_UUID)
+  sessionTopicIds.put(part.topic, id)
   }
-  cachedPartitions
+  (cachedPartitions, sessionTopicIds)
 }
 val responseSessionId = cache.maybeCreateSession(time.milliseconds(), 
isFromFollower,
-updates.size, () => createNewSession)
+updates.size, version, () => createNewSession)
 debug(s"Full fetch context with session id $responseSessionId returning " +
   s"${partitionsToLogString(updates.keySet)}")
-FetchResponse.of(Errors.NONE, 0, responseSessionId, updates)
+FetchResponse.of(error, 0, responseSessionId, updates, topicIds)

Review comment:
   I guess the only issue with using FetchRequest.getErrorResponse is that 
we may have different topics in the response than in the request. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10894: KAFKA-12951: restore must terminate for tx global topic

2021-06-28 Thread GitBox


mjsax merged pull request #10894:
URL: https://github.com/apache/kafka/pull/10894


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-28 Thread GitBox


ccding commented on pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#issuecomment-870025479


   @junrao Thanks for the review. I have addressed the comments. Please take a 
look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-28 Thread GitBox


ccding commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r660101025



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -90,11 +90,58 @@ object LogLoader extends Logging {
*   overflow index offset
*/
   def load(params: LoadLogParams): LoadedLogOffsets = {
-// first do a pass through the files in the log directory and remove any 
temporary files
+
+// First pass: through the files in the log directory and remove any 
temporary files
 // and find any interrupted swap operations
 val swapFiles = removeTempFilesAndCollectSwapFiles(params)
 
-// Now do a second pass and load all the log and index files.
+// The remaining valid swap files must come from compaction or segment 
split operation. We can
+// simply rename them to regular segment files. But, before renaming, we 
should figure out which
+// segments are compacted and delete these segment files: this is done by 
calculating min/maxSwapFileOffset.

Review comment:
   fixed

##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -90,11 +90,58 @@ object LogLoader extends Logging {
*   overflow index offset
*/
   def load(params: LoadLogParams): LoadedLogOffsets = {
-// first do a pass through the files in the log directory and remove any 
temporary files
+
+// First pass: through the files in the log directory and remove any 
temporary files
 // and find any interrupted swap operations
 val swapFiles = removeTempFilesAndCollectSwapFiles(params)
 
-// Now do a second pass and load all the log and index files.
+// The remaining valid swap files must come from compaction or segment 
split operation. We can
+// simply rename them to regular segment files. But, before renaming, we 
should figure out which
+// segments are compacted and delete these segment files: this is done by 
calculating min/maxSwapFileOffset.
+// We store segments that require renaming in this code block, and do the 
actual renaming later.
+var minSwapFileOffset = Long.MaxValue
+var maxSwapFileOffset = Long.MinValue
+swapFiles.filter(f => Log.isLogFile(new 
File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "".foreach { f =>
+  val baseOffset = offsetFromFile(f)
+  val segment = LogSegment.open(f.getParentFile,
+baseOffset = baseOffset,
+params.config,
+time = params.time,
+fileSuffix = Log.SwapFileSuffix)
+  info(s"${params.logIdentifier}Found log file ${f.getPath} from 
interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix} 
files by renaming.")
+  minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset)
+  maxSwapFileOffset = Math.max(segment.offsetIndex.lastOffset, 
maxSwapFileOffset)
+}
+
+// Second pass: delete segments that are between minSwapFileOffset and 
maxSwapFileOffset. As
+// discussed above, these segments were compacted or split but haven't 
been renamed to .delete
+// before shutting down the broker.
+for (file <- params.dir.listFiles if file.isFile) {
+  try {
+if (!file.getName.endsWith(SwapFileSuffix)) {
+  val offset = offsetFromFile(file)
+  if (offset >= minSwapFileOffset && offset <= maxSwapFileOffset) {

Review comment:
   fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stan-confluent commented on a change in pull request #10915: Enable connecting VS Code remote debugger

2021-06-28 Thread GitBox


stan-confluent commented on a change in pull request #10915:
URL: https://github.com/apache/kafka/pull/10915#discussion_r660083163



##
File path: tests/README.md
##
@@ -51,6 +51,40 @@ bash tests/docker/ducker-ak up -j 'openjdk:11'; 
tests/docker/run_tests.sh
 ```
 REBUILD="t" bash tests/docker/run_tests.sh
 ```
+* Debug tests in VS Code:
+  - Run test with `--debug` flag (can be before or after file name):

Review comment:
   @omkreddy I've updated the run_tests.sh file to pass ducktape args after 
-- if those are present, can you please take another look?
   I've also ran shellcheck and fixed the violations found in the changed bits 
of code.
   Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660068452



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -471,16 +504,19 @@ class IncrementalFetchContext(private val time: Time,
   if (session.epoch != expectedEpoch) {
 info(s"Incremental fetch session ${session.id} expected epoch 
$expectedEpoch, but " +
   s"got ${session.epoch}.  Possible duplicate request.")
-FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP)
+FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP, Collections.emptyMap())
   } else {
+var error = Errors.NONE
 // Iterate over the update list using PartitionIterator. This will 
prune updates which don't need to be sent
+// It will also set the top-level error to INCONSISTENT_TOPIC_ID if 
any partitions had this error.
 val partitionIter = new PartitionIterator(updates.entrySet.iterator, 
true)
 while (partitionIter.hasNext) {
-  partitionIter.next()
+  if (partitionIter.next().getValue.errorCode() == 
Errors.INCONSISTENT_TOPIC_ID.code())
+error = Errors.INCONSISTENT_TOPIC_ID

Review comment:
   I'm still not sure I follow "pending fetch request could still reference 
the outdated Partition object and therefore miss the topicId change" My 
understanding is that the log is the source of truth and we will either read 
from the log if it matches and not read if it doesn't. I see we could get an 
error erroneously if the partition didn't update in time, but I don't see us 
being able to read from the log due to a stale partition.
   
   Or are you referring to the getPartitionOrException(tp) call picking up a 
stale partition and both the request and the partition are stale? In this case, 
we will read from the log, but will identify it with its correct ID. The client 
will handle based on this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ueisele commented on a change in pull request #10935: KAFKA-13003: In kraft mode also advertise configured advertised port instead of socket port

2021-06-28 Thread GitBox


ueisele commented on a change in pull request #10935:
URL: https://github.com/apache/kafka/pull/10935#discussion_r660063967



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -283,7 +283,7 @@ class BrokerServer(
 networkListeners.add(new Listener().
   setHost(if (Utils.isBlank(ep.host)) 
InetAddress.getLocalHost.getCanonicalHostName else ep.host).
   setName(ep.listenerName.value()).
-  setPort(socketServer.boundPort(ep.listenerName)).
+  setPort(ep.port).

Review comment:
   Yes, sure. I will add a test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660062652



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param time   The clock to use.
   * @param cache  The fetch session cache.
   * @param reqMetadataThe request metadata.
+  * @param versionThe version of the request,
   * @param fetchData  The partition data from the fetch request.
+  * @param topicIds   The map from topic names to topic IDs.
   * @param isFromFollower True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
private val cache: FetchSessionCache,
private val reqMetadata: JFetchMetadata,
+   private val version: Short,
private val fetchData: util.Map[TopicPartition, 
FetchRequest.PartitionData],
+   private val topicIds: util.Map[String, Uuid],
private val isFromFollower: Boolean) extends 
FetchContext {
   override def getFetchOffset(part: TopicPartition): Option[Long] =
 Option(fetchData.get(part)).map(_.fetchOffset)
 
-  override def foreachPartition(fun: (TopicPartition, 
FetchRequest.PartitionData) => Unit): Unit = {
-fetchData.forEach(fun(_, _))
+  override def foreachPartition(fun: (TopicPartition, Uuid, 
FetchRequest.PartitionData) => Unit): Unit = {
+fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: 
Short): Int = {
-FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
+FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): 
FetchResponse = {
-def createNewSession: FetchSession.CACHE_MAP = {
+var error = Errors.NONE
+def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) 
= {
   val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
+  val sessionTopicIds = new util.HashMap[String, Uuid](updates.size)
   updates.forEach { (part, respData) =>
+if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code())
+  error = Errors.INCONSISTENT_TOPIC_ID
 val reqData = fetchData.get(part)
-cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData))
+val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID)
+cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, 
respData))
+if (id != Uuid.ZERO_UUID)
+  sessionTopicIds.put(part.topic, id)
   }
-  cachedPartitions
+  (cachedPartitions, sessionTopicIds)
 }
 val responseSessionId = cache.maybeCreateSession(time.milliseconds(), 
isFromFollower,
-updates.size, () => createNewSession)
+updates.size, version, () => createNewSession)
 debug(s"Full fetch context with session id $responseSessionId returning " +
   s"${partitionsToLogString(updates.keySet)}")
-FetchResponse.of(Errors.NONE, 0, responseSessionId, updates)
+FetchResponse.of(error, 0, responseSessionId, updates, topicIds)

Review comment:
   I think this goes back to the question of whether it is useful for us to 
have information on the specific partition that failed. If we do this, should 
we also return the error values for the other fields as we do in 
FetchRequest.getErrorResponse?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660061358



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -193,18 +197,22 @@ class CachedPartition(val topic: String,
   * Each fetch session is protected by its own lock, which must be taken 
before mutable
   * fields are read or modified.  This includes modification of the session 
partition map.
   *
-  * @param id   The unique fetch session ID.
-  * @param privileged   True if this session is privileged.  Sessions crated 
by followers
-  * are privileged; sesssion created by consumers are not.
-  * @param partitionMap The CachedPartitionMap.
-  * @param creationMs   The time in milliseconds when this session was created.
-  * @param lastUsedMs   The last used time in milliseconds.  This should only 
be updated by
-  * FetchSessionCache#touch.
-  * @param epochThe fetch session sequence number.
+  * @param id The unique fetch session ID.
+  * @param privileged True if this session is privileged.  Sessions 
crated by followers
+  *   are privileged; session created by consumers are 
not.
+  * @param partitionMap   The CachedPartitionMap.
+  * @param usesTopicIds   True if this session is using topic IDs
+  * @param sessionTopicIdsThe mapping from topic name to topic ID for 
topics in the session.
+  * @param creationMs The time in milliseconds when this session was 
created.
+  * @param lastUsedMs The last used time in milliseconds.  This should 
only be updated by
+  *   FetchSessionCache#touch.
+  * @param epoch  The fetch session sequence number.
   */
 class FetchSession(val id: Int,
val privileged: Boolean,
val partitionMap: FetchSession.CACHE_MAP,
+   val usesTopicIds: Boolean,
+   val sessionTopicIds: FetchSession.TOPIC_ID_MAP,

Review comment:
   I suppose it won't hurt :)

##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param time   The clock to use.
   * @param cache  The fetch session cache.
   * @param reqMetadataThe request metadata.
+  * @param versionThe version of the request,
   * @param fetchData  The partition data from the fetch request.
+  * @param topicIds   The map from topic names to topic IDs.
   * @param isFromFollower True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
private val cache: FetchSessionCache,
private val reqMetadata: JFetchMetadata,
+   private val version: Short,

Review comment:
   We can do that to make things clearer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660061129



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -231,20 +239,31 @@ class FetchSession(val id: Int,
   def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
 
   def getFetchOffset(topicPartition: TopicPartition): Option[Long] = 
synchronized {
-Option(partitionMap.find(new 
CachedPartition(topicPartition))).map(_.fetchOffset)
+Option(partitionMap.find(new CachedPartition(topicPartition,
+  sessionTopicIds.getOrDefault(topicPartition.topic(), 
Uuid.ZERO_UUID.map(_.fetchOffset)
   }
 
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
   def update(fetchData: FetchSession.REQ_MAP,
  toForget: util.List[TopicPartition],
- reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+ reqMetadata: JFetchMetadata,
+ topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = 
synchronized {
 val added = new TL
 val updated = new TL
 val removed = new TL
+val inconsistentTopicIds = new TL
 fetchData.forEach { (topicPart, reqData) =>
-  val newCachedPart = new CachedPartition(topicPart, reqData)
+  // Get the topic ID on the broker, if it is valid and the topic is new 
to the session, add its ID.
+  // If the topic already existed, check that its ID is consistent.
+  val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)
+  val newCachedPart = new CachedPartition(topicPart, id, reqData)
+  if (id != Uuid.ZERO_UUID) {
+val prevSessionTopicId = sessionTopicIds.put(topicPart.topic, id)

Review comment:
   If a topic ID changes, the FetchSession will become a FetchErrorSession 
and close. I can change to putIfAbsent if it makes things clearer, but all this 
state will go away upon an error + session close.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`

2021-06-28 Thread GitBox


jsancio commented on a change in pull request #10913:
URL: https://github.com/apache/kafka/pull/10913#discussion_r660023057



##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -514,6 +514,16 @@ public UnattachedState unattachedStateOrThrow() {
 throw new IllegalStateException("Expected to be Leader, but current 
state is " + state);
 }
 
+@SuppressWarnings("unchecked")
+public  Optional> maybeLeaderState() {
+EpochState state = this.state;
+if (state instanceof  LeaderState) {
+return Optional.of((LeaderState) state);
+} else {
+return Optional.empty();
+}
+}

Review comment:
   Thank you. You read my mind. I was thinking of something similar after I 
left my previous comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot

2021-06-28 Thread GitBox


jsancio commented on a change in pull request #10899:
URL: https://github.com/apache/kafka/pull/10899#discussion_r660032373



##
File path: clients/src/main/resources/common/message/SnapshotHeaderRecord.json
##
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "SnapshotHeaderRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{"name": "Version", "type": "int16", "versions": "0+",
+  "about": "The version of the snapshot header record"},
+{"name": "LastContainedLogTime", "type": "int64", "versions": "0+",
+  "about": "The append time of the highest record contained in this 
snapshot"}

Review comment:
   How about "The append time of the last record from the log contained in 
the snapshot"?

##
File path: 
raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##
@@ -151,6 +194,57 @@ public void testAppendToFrozenSnapshot() throws Exception {
 );
 }
 
+private int validateDelimiters(
+RawSnapshotReader snapshot,
+long lastContainedLogTime
+) {
+assertNotEquals(0, snapshot.sizeInBytes());
+
+int countRecords = 0;
+
+Iterator recordBatches = 
Utils.covariantCast(snapshot.records().batchIterator());
+
+assertEquals(Boolean.TRUE, recordBatches.hasNext());

Review comment:
   You can use `assertTrue` instead. This comment applies to a few places.

##
File path: clients/src/main/resources/common/message/SnapshotHeaderRecord.json
##
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "SnapshotHeaderRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{"name": "Version", "type": "int16", "versions": "0+",
+  "about": "The version of the snapshot header record"},
+{"name": "LastContainedLogTime", "type": "int64", "versions": "0+",

Review comment:
   Let's call this `LastContainedLogTimestamp` to match Kafka's existing 
use of "Timestamp"`.

##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -204,35 +207,103 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
-public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, long currentTimeMs) {
+private void appendControlMessage(
+Supplier supplier,
+ByteBuffer buffer
+) {
 appendLock.lock();
 try {
 forceDrain();
-ByteBuffer buffer = memoryPool.tryAllocate(256);
-if (buffer != null) {
-MemoryRecords data = MemoryRecords.withLeaderChangeMessage(
-this.nextOffset, 
-currentTimeMs, 
-this.epoch, 
-buffer, 
-leaderChangeMessage
-);
-completed.add(new CompletedBatch<>(
-nextOffset,
-1,
-data,
-memoryPool,
-buffer
-));
-nextOffset += 1;
-} else {
-throw new IllegalStateException("Could not allocate buffer for 
the leader change record.");
-}
+completed.add(new CompletedBatch<>(
+nextOffset,
+1,
+

[GitHub] [kafka] jacky1193610322 commented on a change in pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)

2021-06-28 Thread GitBox


jacky1193610322 commented on a change in pull request #10931:
URL: https://github.com/apache/kafka/pull/10931#discussion_r660054798



##
File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
##
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+
+/**
+ * Represents changes to the topics in the metadata image.
+ */
+public final class TopicsDelta {
+private final TopicsImage image;
+
+/**
+ * A map from topic IDs to the topic deltas for each topic. Topics which 
have been
+ * deleted will not appear in this map.
+ */
+private final Map changedTopics = new HashMap<>();
+
+/**
+ * The IDs of topics that exist in the image but that have been deleted. 
Note that if
+ * a topic does not exist in the image, it will also not exist in this 
set. Topics
+ * that are created and then deleted within the same delta will leave no 
trace.
+ */
+private final Set deletedTopicIds = new HashSet<>();

Review comment:
   Thanks for your reply, it's right if will always keep the delta not 
replay before, which means we must persist the checkpoint offset every commit, 
but now we are not. for example, if the broker crash, we maybe replay the 
latest snapshot and the delta after the snapshot offset, but it may be the 
delta after the snapshot has been replay before. I don't sure it's right? thank 
you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-28 Thread GitBox


junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660016282



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -193,18 +197,22 @@ class CachedPartition(val topic: String,
   * Each fetch session is protected by its own lock, which must be taken 
before mutable
   * fields are read or modified.  This includes modification of the session 
partition map.
   *
-  * @param id   The unique fetch session ID.
-  * @param privileged   True if this session is privileged.  Sessions crated 
by followers
-  * are privileged; sesssion created by consumers are not.
-  * @param partitionMap The CachedPartitionMap.
-  * @param creationMs   The time in milliseconds when this session was created.
-  * @param lastUsedMs   The last used time in milliseconds.  This should only 
be updated by
-  * FetchSessionCache#touch.
-  * @param epochThe fetch session sequence number.
+  * @param id The unique fetch session ID.
+  * @param privileged True if this session is privileged.  Sessions 
crated by followers
+  *   are privileged; session created by consumers are 
not.
+  * @param partitionMap   The CachedPartitionMap.
+  * @param usesTopicIds   True if this session is using topic IDs
+  * @param sessionTopicIdsThe mapping from topic name to topic ID for 
topics in the session.
+  * @param creationMs The time in milliseconds when this session was 
created.
+  * @param lastUsedMs The last used time in milliseconds.  This should 
only be updated by
+  *   FetchSessionCache#touch.
+  * @param epoch  The fetch session sequence number.
   */
 class FetchSession(val id: Int,
val privileged: Boolean,
val partitionMap: FetchSession.CACHE_MAP,
+   val usesTopicIds: Boolean,
+   val sessionTopicIds: FetchSession.TOPIC_ID_MAP,

Review comment:
   Do we need to include the new fields in toString()?

##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -231,20 +239,31 @@ class FetchSession(val id: Int,
   def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
 
   def getFetchOffset(topicPartition: TopicPartition): Option[Long] = 
synchronized {
-Option(partitionMap.find(new 
CachedPartition(topicPartition))).map(_.fetchOffset)
+Option(partitionMap.find(new CachedPartition(topicPartition,
+  sessionTopicIds.getOrDefault(topicPartition.topic(), 
Uuid.ZERO_UUID.map(_.fetchOffset)
   }
 
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
   def update(fetchData: FetchSession.REQ_MAP,
  toForget: util.List[TopicPartition],
- reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+ reqMetadata: JFetchMetadata,
+ topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = 
synchronized {
 val added = new TL
 val updated = new TL
 val removed = new TL
+val inconsistentTopicIds = new TL
 fetchData.forEach { (topicPart, reqData) =>
-  val newCachedPart = new CachedPartition(topicPart, reqData)
+  // Get the topic ID on the broker, if it is valid and the topic is new 
to the session, add its ID.
+  // If the topic already existed, check that its ID is consistent.
+  val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)
+  val newCachedPart = new CachedPartition(topicPart, id, reqData)
+  if (id != Uuid.ZERO_UUID) {
+val prevSessionTopicId = sessionTopicIds.put(topicPart.topic, id)

Review comment:
   It seems that we should never change the topicId in sessionTopicIds? 
Perhaps we should use putIfAbsent.
   
   Similarly, if the topicId changes, I am not sure if we should update 
partitionMap below.

##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -471,16 +505,19 @@ class IncrementalFetchContext(private val time: Time,
   if (session.epoch != expectedEpoch) {
 info(s"Incremental fetch session ${session.id} expected epoch 
$expectedEpoch, but " +
   s"got ${session.epoch}.  Possible duplicate request.")
-FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP)
+FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP, Collections.emptyMap())
   } else {
+var error = Errors.NONE

Review comment:
   error => topLevelError?

##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -471,16 +504,19 @@ class IncrementalFetchContext(private val time: Time,
   if (session.epoch != expectedEpoch) {
 info(s"Incremental fetch session ${session.id} expected epoch 
$expectedEpoch, 

[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-28 Thread GitBox


ccding commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r660039117



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -192,20 +230,10 @@ object LogLoader extends Logging {
 debug(s"${params.logIdentifier}Deleting stray temporary file 
${file.getAbsolutePath}")
 Files.deleteIfExists(file.toPath)
   } else if (filename.endsWith(CleanedFileSuffix)) {
-minCleanedFileOffset = Math.min(offsetFromFileName(filename), 
minCleanedFileOffset)
-cleanFiles += file
+minCleanedFileOffset = Math.min(offsetFromFile(file), 
minCleanedFileOffset)
+cleanedFiles += file
   } else if (filename.endsWith(SwapFileSuffix)) {
-// we crashed in the middle of a swap operation, to recover:
-// if a log, delete the index files, complete the swap operation later
-// if an index just delete the index files, they will be rebuilt
-val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, 
SwapFileSuffix, ""))
-info(s"${params.logIdentifier}Found file ${file.getAbsolutePath} from 
interrupted swap operation.")
-if (Log.isIndexFile(baseFile)) {
-  deleteIndicesIfExist(baseFile)
-} else if (Log.isLogFile(baseFile)) {
-  deleteIndicesIfExist(baseFile)
-  swapFiles += file
-}
+swapFiles += file

Review comment:
   Due to KAFKA-6264, if there are any .cleaned files (no matter they are 
.index.cleaned or .log.cleaned), we delete all .cleaned files and .swap files 
that have larger/equal base offsets. Basically, this reverts ongoing 
compaction/split operations. Therefore, we don't have any additional 
.index.cleaned files.
   
   Is that fair?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-28 Thread GitBox


ccding commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r660039117



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -192,20 +230,10 @@ object LogLoader extends Logging {
 debug(s"${params.logIdentifier}Deleting stray temporary file 
${file.getAbsolutePath}")
 Files.deleteIfExists(file.toPath)
   } else if (filename.endsWith(CleanedFileSuffix)) {
-minCleanedFileOffset = Math.min(offsetFromFileName(filename), 
minCleanedFileOffset)
-cleanFiles += file
+minCleanedFileOffset = Math.min(offsetFromFile(file), 
minCleanedFileOffset)
+cleanedFiles += file
   } else if (filename.endsWith(SwapFileSuffix)) {
-// we crashed in the middle of a swap operation, to recover:
-// if a log, delete the index files, complete the swap operation later
-// if an index just delete the index files, they will be rebuilt
-val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, 
SwapFileSuffix, ""))
-info(s"${params.logIdentifier}Found file ${file.getAbsolutePath} from 
interrupted swap operation.")
-if (Log.isIndexFile(baseFile)) {
-  deleteIndicesIfExist(baseFile)
-} else if (Log.isLogFile(baseFile)) {
-  deleteIndicesIfExist(baseFile)
-  swapFiles += file
-}
+swapFiles += file

Review comment:
   Due to KAFKA-6264, if there are any .cleaned files (no matter they are 
.index.cleaned or .log.cleaned), we delete all .cleaned files and .swap files 
that have larger/equal base offsets. Basically, this reverts ongoing 
compaction/split operations.
   
   Therefore, we don't have any additional .index.cleaned files.
   
   Is that fair?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-28 Thread GitBox


ccding commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r660039117



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -192,20 +230,10 @@ object LogLoader extends Logging {
 debug(s"${params.logIdentifier}Deleting stray temporary file 
${file.getAbsolutePath}")
 Files.deleteIfExists(file.toPath)
   } else if (filename.endsWith(CleanedFileSuffix)) {
-minCleanedFileOffset = Math.min(offsetFromFileName(filename), 
minCleanedFileOffset)
-cleanFiles += file
+minCleanedFileOffset = Math.min(offsetFromFile(file), 
minCleanedFileOffset)
+cleanedFiles += file
   } else if (filename.endsWith(SwapFileSuffix)) {
-// we crashed in the middle of a swap operation, to recover:
-// if a log, delete the index files, complete the swap operation later
-// if an index just delete the index files, they will be rebuilt
-val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, 
SwapFileSuffix, ""))
-info(s"${params.logIdentifier}Found file ${file.getAbsolutePath} from 
interrupted swap operation.")
-if (Log.isIndexFile(baseFile)) {
-  deleteIndicesIfExist(baseFile)
-} else if (Log.isLogFile(baseFile)) {
-  deleteIndicesIfExist(baseFile)
-  swapFiles += file
-}
+swapFiles += file

Review comment:
   Due to KAFKA-6264, if there are any .cleaned files (no matter they are 
.index.cleaned or .log.cleaned), we delete all .cleaned files and .swap files 
that have larger/equal base offsets. Basically, this reverts ongoing 
compaction/split operations.
   
   Therefore, at this point, we don't have any .cleaned files.
   
   Is that fair?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-28 Thread GitBox


ccding commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r660039117



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -192,20 +230,10 @@ object LogLoader extends Logging {
 debug(s"${params.logIdentifier}Deleting stray temporary file 
${file.getAbsolutePath}")
 Files.deleteIfExists(file.toPath)
   } else if (filename.endsWith(CleanedFileSuffix)) {
-minCleanedFileOffset = Math.min(offsetFromFileName(filename), 
minCleanedFileOffset)
-cleanFiles += file
+minCleanedFileOffset = Math.min(offsetFromFile(file), 
minCleanedFileOffset)
+cleanedFiles += file
   } else if (filename.endsWith(SwapFileSuffix)) {
-// we crashed in the middle of a swap operation, to recover:
-// if a log, delete the index files, complete the swap operation later
-// if an index just delete the index files, they will be rebuilt
-val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, 
SwapFileSuffix, ""))
-info(s"${params.logIdentifier}Found file ${file.getAbsolutePath} from 
interrupted swap operation.")
-if (Log.isIndexFile(baseFile)) {
-  deleteIndicesIfExist(baseFile)
-} else if (Log.isLogFile(baseFile)) {
-  deleteIndicesIfExist(baseFile)
-  swapFiles += file
-}
+swapFiles += file

Review comment:
   Due to KAFKA-6264, if there are any .cleaned files (no matter they are 
.index.cleaned or .log.cleaned), we delete all .cleaned files and .swap files 
that have larger base offset. Basically, this reverts ongoing compaction/split 
operations.
   
   Therefore, at this point, we don't have any .cleaned files.
   
   Is that fair?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`

2021-06-28 Thread GitBox


mumrah commented on a change in pull request #10913:
URL: https://github.com/apache/kafka/pull/10913#discussion_r660029217



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2250,7 +2249,40 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 
 @Override
 public void resign(int epoch) {
-throw new UnsupportedOperationException();
+if (epoch < 0) {
+throw new IllegalArgumentException("Attempt to resign from an 
invalid negative epoch " + epoch);
+}
+
+if (!quorum.isVoter()) {
+throw new IllegalArgumentException("Attempt to resign by a 
non-voter");
+}
+
+LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
+int currentEpoch = leaderAndEpoch.epoch();
+
+if (epoch > currentEpoch) {
+throw new IllegalArgumentException("Attempt to resign from epoch " 
+ epoch +
+" which is larger than the current epoch " + currentEpoch);
+} else if (epoch < currentEpoch) {
+// If the passed epoch is smaller than the current epoch, then it 
might mean
+// that the listener has not been notified about a leader change 
that already
+// took place. In this case, we consider the call as already 
fulfilled and
+// take no further action.
+return;
+} else if (!leaderAndEpoch.isLeader(quorum.localIdOrThrow())) {

Review comment:
   I see what you mean, and yea that is a fair point  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #10930: KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs

2021-06-28 Thread GitBox


rajinisivaram commented on pull request #10930:
URL: https://github.com/apache/kafka/pull/10930#issuecomment-869929738


   @guozhangwang With Errors.NONE, we throw OffsetOutOfRangeException in the 
follower when attempting to update follower's start offset based on the 
leader's start offset returned in the response: 
https://github.com/apache/kafka/blob/397fa1f894c176d71601183c36e5d498fc83fd1e/core/src/main/scala/kafka/log/Log.scala#L997.
  Since that is a safeguard that existed prior to the new code in the leader to 
process diverging epochs for IBP 2.7 and higher, it seems safer to retain it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mdedetrich edited a comment on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

2021-06-28 Thread GitBox


mdedetrich edited a comment on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-869693584


   Is there anything left on this PR to be merged (apart from the changelog 
which is a nice to have)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`

2021-06-28 Thread GitBox


mumrah commented on a change in pull request #10913:
URL: https://github.com/apache/kafka/pull/10913#discussion_r660025194



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1914,8 +1915,7 @@ private long pollLeader(long currentTimeMs) {
 LeaderState state = quorum.leaderStateOrThrow();
 maybeFireLeaderChange(state);
 
-GracefulShutdown shutdown = this.shutdown.get();
-if (shutdown != null) {
+if (shutdown.get() != null || state.isResignRequested()) {

Review comment:
   Works for me  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`

2021-06-28 Thread GitBox


hachikuji commented on a change in pull request #10913:
URL: https://github.com/apache/kafka/pull/10913#discussion_r660024276



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1914,8 +1915,7 @@ private long pollLeader(long currentTimeMs) {
 LeaderState state = quorum.leaderStateOrThrow();
 maybeFireLeaderChange(state);
 
-GracefulShutdown shutdown = this.shutdown.get();
-if (shutdown != null) {
+if (shutdown.get() != null || state.isResignRequested()) {

Review comment:
   If you don't mind, let's do this refactor separately. There are a fair 
number of uses that would benefit from having `Optional`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`

2021-06-28 Thread GitBox


jsancio commented on a change in pull request #10913:
URL: https://github.com/apache/kafka/pull/10913#discussion_r660023057



##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -514,6 +514,16 @@ public UnattachedState unattachedStateOrThrow() {
 throw new IllegalStateException("Expected to be Leader, but current 
state is " + state);
 }
 
+@SuppressWarnings("unchecked")
+public  Optional> maybeLeaderState() {
+EpochState state = this.state;
+if (state instanceof  LeaderState) {
+return Optional.of((LeaderState) state);
+} else {
+return Optional.empty();
+}
+}

Review comment:
   Thanks you. You read my mind. I was thinking of something similar after 
I left my previous comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`

2021-06-28 Thread GitBox


hachikuji commented on a change in pull request #10913:
URL: https://github.com/apache/kafka/pull/10913#discussion_r660021011



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2250,7 +2249,40 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 
 @Override
 public void resign(int epoch) {
-throw new UnsupportedOperationException();
+if (epoch < 0) {
+throw new IllegalArgumentException("Attempt to resign from an 
invalid negative epoch " + epoch);
+}
+
+if (!quorum.isVoter()) {
+throw new IllegalArgumentException("Attempt to resign by a 
non-voter");
+}
+
+LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
+int currentEpoch = leaderAndEpoch.epoch();
+
+if (epoch > currentEpoch) {
+throw new IllegalArgumentException("Attempt to resign from epoch " 
+ epoch +
+" which is larger than the current epoch " + currentEpoch);
+} else if (epoch < currentEpoch) {
+// If the passed epoch is smaller than the current epoch, then it 
might mean
+// that the listener has not been notified about a leader change 
that already
+// took place. In this case, we consider the call as already 
fulfilled and
+// take no further action.
+return;
+} else if (!leaderAndEpoch.isLeader(quorum.localIdOrThrow())) {

Review comment:
   If the epoch has moved on, then the leader check is likely to fail, so 
the current order seems to make sense. We don't keep a history of previous 
states, so I think the best we can do is catch cases where the passed epoch 
does not make sense with the current state.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`

2021-06-28 Thread GitBox


hachikuji commented on a change in pull request #10913:
URL: https://github.com/apache/kafka/pull/10913#discussion_r660007360



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1914,8 +1915,7 @@ private long pollLeader(long currentTimeMs) {
 LeaderState state = quorum.leaderStateOrThrow();
 maybeFireLeaderChange(state);
 
-GracefulShutdown shutdown = this.shutdown.get();
-if (shutdown != null) {
+if (shutdown.get() != null || state.isResignRequested()) {

Review comment:
   That's fair. Let me see if it's reasonable to do this here or if we 
should push to a separate PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`

2021-06-28 Thread GitBox


mumrah commented on a change in pull request #10913:
URL: https://github.com/apache/kafka/pull/10913#discussion_r659989672



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2250,7 +2250,42 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 
 @Override
 public void resign(int epoch) {
-throw new UnsupportedOperationException();
+if (epoch < 0) {
+throw new IllegalArgumentException("Attempt to resign from an 
invalid negative epoch " + epoch);
+}
+
+if (!quorum.isVoter()) {
+throw new IllegalArgumentException("Attempt to resign by a 
non-voter");

Review comment:
   Would IllegalStateException be better here?

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1914,8 +1915,7 @@ private long pollLeader(long currentTimeMs) {
 LeaderState state = quorum.leaderStateOrThrow();
 maybeFireLeaderChange(state);
 
-GracefulShutdown shutdown = this.shutdown.get();
-if (shutdown != null) {
+if (shutdown.get() != null || state.isResignRequested()) {

Review comment:
   unrelated, but maybe worth creating helper method that returns 
`Optional` to avoid these null checks throughout

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2250,7 +2249,40 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 
 @Override
 public void resign(int epoch) {
-throw new UnsupportedOperationException();
+if (epoch < 0) {
+throw new IllegalArgumentException("Attempt to resign from an 
invalid negative epoch " + epoch);
+}
+
+if (!quorum.isVoter()) {
+throw new IllegalArgumentException("Attempt to resign by a 
non-voter");
+}
+
+LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
+int currentEpoch = leaderAndEpoch.epoch();
+
+if (epoch > currentEpoch) {
+throw new IllegalArgumentException("Attempt to resign from epoch " 
+ epoch +
+" which is larger than the current epoch " + currentEpoch);
+} else if (epoch < currentEpoch) {
+// If the passed epoch is smaller than the current epoch, then it 
might mean
+// that the listener has not been notified about a leader change 
that already
+// took place. In this case, we consider the call as already 
fulfilled and
+// take no further action.
+return;
+} else if (!leaderAndEpoch.isLeader(quorum.localIdOrThrow())) {

Review comment:
   Does it make sense to do this check before the epoch validation? If 
we're not the leader and received an old epoch (which, if i understand, seems 
likely if we're _not_ the leader anymore), we will silently ignore in the above 
case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

2021-06-28 Thread GitBox


mimaison commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-869893715


   Let's put a line in the changelog now so we're sure it's included in the 
release notes. I'm happy to merge once this is done.
   
   I agree the documentation can come later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics

2021-06-28 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r659995458



##
File path: clients/src/main/java/org/apache/kafka/common/TopicCollection.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Collection;
+
+/**
+ * A class used to represent a collection of topics. This collection may 
define topics by topic name
+ * attribute or topic ID attribute. Subclassing this class beyond the classes 
provided here is not supported.
+ */
+public abstract class TopicCollection {
+private final TopicAttribute attribute;
+
+/**
+ * An enum used to describe how topics in the collection are identified
+ */
+public enum TopicAttribute {

Review comment:
   I kept this TopicAttribute field for now, though we could case on the 
class (TopicIdCollection/TopicNameCollection) One reason I didn't remove it is 
that the result class(es) still needs it. (Since we are building off the 
existing class). If there is a way to remove from the result class, or a better 
place to put the attribute, let me know.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomaskwscott opened a new pull request #10936: KAFKA-13002 Fix for non max timestamp degrade case

2021-06-28 Thread GitBox


thomaskwscott opened a new pull request #10936:
URL: https://github.com/apache/kafka/pull/10936


   KAFKA-12541 introduced a regression for listOffsets requests for non 
maxtimestamp specs. when communicating with old brokers. This PR addresss this 
case.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   Tested with new unit test for regression case.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10774) Support Describe topic using topic IDs

2021-06-28 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17370749#comment-17370749
 ] 

Justine Olshan commented on KAFKA-10774:


Removed the reference to describe topics from this ticket: 
https://issues.apache.org/jira/browse/KAFKA-12976
But we will want to only return UNKNOWN_TOPIC_ID and not UNSUPPORTED_VERSION

> Support Describe topic using topic IDs
> --
>
> Key: KAFKA-10774
> URL: https://issues.apache.org/jira/browse/KAFKA-10774
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
>
> Similar to KAFKA-10547 which add topic IDs in MetadataResp, we add topic IDs 
> to MetadataReq and can get TopicDesc by topic IDs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12976) Remove UNSUPPORTED_VERSION error from delete topics call

2021-06-28 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12976:

Fix Version/s: 2.8.1
   3.0.0

> Remove UNSUPPORTED_VERSION error from delete topics call
> 
>
> Key: KAFKA-12976
> URL: https://issues.apache.org/jira/browse/KAFKA-12976
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.0.0, 2.8.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.0.0, 2.8.1
>
>
> Originally I thought it would be useful to have an unsupported version error 
> returned when the broker's IBP did not support the operation. However, this 
> error is transient and in the -case of describe topics, it may not be 
> accurate.- Additionally, unsupported version is not retriable when the 
> scenarios that see this likely should be. 
> I propose always returning UNKNOWN_TOPIC_ID error when the topic ID is not 
> found on the broker. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12976) Remove UNSUPPORTED_VERSION error from delete topics call

2021-06-28 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12976.
-
Resolution: Fixed

> Remove UNSUPPORTED_VERSION error from delete topics call
> 
>
> Key: KAFKA-12976
> URL: https://issues.apache.org/jira/browse/KAFKA-12976
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.0.0, 2.8.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.0.0, 2.8.1
>
>
> Originally I thought it would be useful to have an unsupported version error 
> returned when the broker's IBP did not support the operation. However, this 
> error is transient and in the -case of describe topics, it may not be 
> accurate.- Additionally, unsupported version is not retriable when the 
> scenarios that see this likely should be. 
> I propose always returning UNKNOWN_TOPIC_ID error when the topic ID is not 
> found on the broker. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12976) Remove UNSUPPORTED_VERSION error from delete topics call

2021-06-28 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-12976:
---
Affects Version/s: 3.0.0
   2.8.0

> Remove UNSUPPORTED_VERSION error from delete topics call
> 
>
> Key: KAFKA-12976
> URL: https://issues.apache.org/jira/browse/KAFKA-12976
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.0.0, 2.8.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> Originally I thought it would be useful to have an unsupported version error 
> returned when the broker's IBP did not support the operation. However, this 
> error is transient and in the -case of describe topics, it may not be 
> accurate.- Additionally, unsupported version is not retriable when the 
> scenarios that see this likely should be. 
> I propose always returning UNKNOWN_TOPIC_ID error when the topic ID is not 
> found on the broker. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12976) Remove UNSUPPORTED_VERSION error from delete topics call

2021-06-28 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-12976:
---
Description: 
Originally I thought it would be useful to have an unsupported version error 
returned when the broker's IBP did not support the operation. However, this 
error is transient and in the -case of describe topics, it may not be 
accurate.- Additionally, unsupported version is not retriable when the 
scenarios that see this likely should be. 

I propose always returning UNKNOWN_TOPIC_ID error when the topic ID is not 
found on the broker. 

  was:
Originally I thought it would be useful to have an unsupported version error 
returned when the broker's IBP did not support the operation. However, this 
error is transient and in the case of describe topics, it may not be accurate. 
Additionally, unsupported version is not retriable when the scenarios that see 
this likely should be. 

I propose always returning UNKNOWN_TOPIC_ID error when the topic ID is not 
found on the broker. 

Summary: Remove UNSUPPORTED_VERSION error from delete topics call  
(was: Remove UNSUPPORTED_VERSION error from delete and describe topics calls)

> Remove UNSUPPORTED_VERSION error from delete topics call
> 
>
> Key: KAFKA-12976
> URL: https://issues.apache.org/jira/browse/KAFKA-12976
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> Originally I thought it would be useful to have an unsupported version error 
> returned when the broker's IBP did not support the operation. However, this 
> error is transient and in the -case of describe topics, it may not be 
> accurate.- Additionally, unsupported version is not retriable when the 
> scenarios that see this likely should be. 
> I propose always returning UNKNOWN_TOPIC_ID error when the topic ID is not 
> found on the broker. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #10923: KAFKA-12976: Remove UNSUPPORTED_VERSION error from delete topics call

2021-06-28 Thread GitBox


hachikuji merged pull request #10923:
URL: https://github.com/apache/kafka/pull/10923


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-28 Thread GitBox


junrao commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r659963924



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -90,11 +90,58 @@ object LogLoader extends Logging {
*   overflow index offset
*/
   def load(params: LoadLogParams): LoadedLogOffsets = {
-// first do a pass through the files in the log directory and remove any 
temporary files
+
+// First pass: through the files in the log directory and remove any 
temporary files
 // and find any interrupted swap operations
 val swapFiles = removeTempFilesAndCollectSwapFiles(params)
 
-// Now do a second pass and load all the log and index files.
+// The remaining valid swap files must come from compaction or segment 
split operation. We can
+// simply rename them to regular segment files. But, before renaming, we 
should figure out which
+// segments are compacted and delete these segment files: this is done by 
calculating min/maxSwapFileOffset.

Review comment:
   "which segments are compacted": .swap files are also generated from 
splitting.

##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -192,20 +230,10 @@ object LogLoader extends Logging {
 debug(s"${params.logIdentifier}Deleting stray temporary file 
${file.getAbsolutePath}")
 Files.deleteIfExists(file.toPath)
   } else if (filename.endsWith(CleanedFileSuffix)) {
-minCleanedFileOffset = Math.min(offsetFromFileName(filename), 
minCleanedFileOffset)
-cleanFiles += file
+minCleanedFileOffset = Math.min(offsetFromFile(file), 
minCleanedFileOffset)
+cleanedFiles += file
   } else if (filename.endsWith(SwapFileSuffix)) {
-// we crashed in the middle of a swap operation, to recover:
-// if a log, delete the index files, complete the swap operation later
-// if an index just delete the index files, they will be rebuilt
-val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, 
SwapFileSuffix, ""))
-info(s"${params.logIdentifier}Found file ${file.getAbsolutePath} from 
interrupted swap operation.")
-if (Log.isIndexFile(baseFile)) {
-  deleteIndicesIfExist(baseFile)
-} else if (Log.isLogFile(baseFile)) {
-  deleteIndicesIfExist(baseFile)
-  swapFiles += file
-}
+swapFiles += file

Review comment:
   It's possible that during renaming, we have only renamed the .log file 
to .swap, but not the corresponding index files. Should we find those .clean 
files with the same offset and rename them to .swap?

##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -90,11 +90,58 @@ object LogLoader extends Logging {
*   overflow index offset
*/
   def load(params: LoadLogParams): LoadedLogOffsets = {
-// first do a pass through the files in the log directory and remove any 
temporary files
+
+// First pass: through the files in the log directory and remove any 
temporary files
 // and find any interrupted swap operations
 val swapFiles = removeTempFilesAndCollectSwapFiles(params)
 
-// Now do a second pass and load all the log and index files.
+// The remaining valid swap files must come from compaction or segment 
split operation. We can
+// simply rename them to regular segment files. But, before renaming, we 
should figure out which
+// segments are compacted and delete these segment files: this is done by 
calculating min/maxSwapFileOffset.
+// We store segments that require renaming in this code block, and do the 
actual renaming later.
+var minSwapFileOffset = Long.MaxValue
+var maxSwapFileOffset = Long.MinValue
+swapFiles.filter(f => Log.isLogFile(new 
File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "".foreach { f =>
+  val baseOffset = offsetFromFile(f)
+  val segment = LogSegment.open(f.getParentFile,
+baseOffset = baseOffset,
+params.config,
+time = params.time,
+fileSuffix = Log.SwapFileSuffix)
+  info(s"${params.logIdentifier}Found log file ${f.getPath} from 
interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix} 
files by renaming.")
+  minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset)
+  maxSwapFileOffset = Math.max(segment.offsetIndex.lastOffset, 
maxSwapFileOffset)
+}
+
+// Second pass: delete segments that are between minSwapFileOffset and 
maxSwapFileOffset. As
+// discussed above, these segments were compacted or split but haven't 
been renamed to .delete
+// before shutting down the broker.
+for (file <- params.dir.listFiles if file.isFile) {
+  try {
+if (!file.getName.endsWith(SwapFileSuffix)) {
+  val offset = offsetFromFile(file)
+  if (offset >= 

[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2021-06-28 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17370724#comment-17370724
 ] 

Jun Rao commented on KAFKA-2729:


[~l0co], thanks for reporting this. The "Cached zkVersion [212]" error 
indicates the leader epoch was changed by the controller but somehow wasn't 
propagated to the broker. Could you grep for "Partition __consumer_offsets-30" 
in the controller and state-change log and see which controller changed the 
leader epoch corresponding to zk version 212 and whether the controller tried 
to propagate that info to the brokers?

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1, 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.11.0.0, 2.4.1
>Reporter: Danil Serdyuchenko
>Assignee: Onur Karaman
>Priority: Critical
> Fix For: 1.1.0
>
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)

2021-06-28 Thread GitBox


cmccabe commented on a change in pull request #10931:
URL: https://github.com/apache/kafka/pull/10931#discussion_r659940260



##
File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
##
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+
+/**
+ * Represents changes to the topics in the metadata image.
+ */
+public final class TopicsDelta {
+private final TopicsImage image;
+
+/**
+ * A map from topic IDs to the topic deltas for each topic. Topics which 
have been
+ * deleted will not appear in this map.
+ */
+private final Map changedTopics = new HashMap<>();
+
+/**
+ * The IDs of topics that exist in the image but that have been deleted. 
Note that if
+ * a topic does not exist in the image, it will also not exist in this 
set. Topics
+ * that are created and then deleted within the same delta will leave no 
trace.
+ */
+private final Set deletedTopicIds = new HashSet<>();

Review comment:
   If the topic is created and then deleted within the same delta, the 
topic log dir is never created, so there is no need for it to be deleted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ryannedolan commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

2021-06-28 Thread GitBox


ryannedolan commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-869824087


   I think changelog and documentation updates can come after this is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ryannedolan commented on pull request #10629: BlockingConnectorTest improvements to verify Connectors and Tasks are successfully deleted

2021-06-28 Thread GitBox


ryannedolan commented on pull request #10629:
URL: https://github.com/apache/kafka/pull/10629#issuecomment-869823002


   Good point @C0urante I hadn't considered how expensive these tests can be. 
I'll see if I can collapse them into fewer tests. thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10629: BlockingConnectorTest improvements to verify Connectors and Tasks are successfully deleted

2021-06-28 Thread GitBox


C0urante commented on a change in pull request #10629:
URL: https://github.com/apache/kafka/pull/10629#discussion_r659903390



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
##
@@ -392,6 +392,48 @@ protected boolean checkConnectorAndTasksAreStopped(String 
connectorName) {
 && info.tasks().stream().noneMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));
 }
 
+/**
+ * Assert that a connector and its tasks are deleted.
+ *
+ * @param connectorName the connector name
+ * @param detailMessage the assertion message
+ * @throws InterruptedException
+ */
+public void assertConnectorAndTasksAreDeleted(String connectorName, String 
detailMessage)

Review comment:
   Nit: do we really need the `AndTasks` section? Might be fine to use 
`assertConnectorIsDeleted` instead.

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
##
@@ -392,6 +392,48 @@ protected boolean checkConnectorAndTasksAreStopped(String 
connectorName) {
 && info.tasks().stream().noneMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));
 }
 
+/**
+ * Assert that a connector and its tasks are deleted.
+ *
+ * @param connectorName the connector name
+ * @param detailMessage the assertion message
+ * @throws InterruptedException
+ */
+public void assertConnectorAndTasksAreDeleted(String connectorName, String 
detailMessage)
+throws InterruptedException {
+try {
+waitForCondition(
+() -> checkConnectorAndTasksAreDeleted(connectorName),
+CONNECTOR_SETUP_DURATION_MS,
+"At least the connector or one of its tasks still exists.");
+} catch (AssertionError e) {
+throw new AssertionError(detailMessage, e);
+}
+}
+
+/**
+ * Check whether the connector or any of its tasks still exist.
+ *
+ * @param connectorName the connector
+ * @return true if the connector and all the tasks are not in RUNNING 
state; false otherwise
+ */
+protected boolean checkConnectorAndTasksAreDeleted(String connectorName) {
+ConnectorStateInfo info;
+try {
+info = connect.connectorStatus(connectorName);
+} catch (ConnectRestException e) {
+return e.statusCode() == Response.Status.NOT_FOUND.getStatusCode();
+} catch (Exception e) {
+log.error("Could not check connector state info.", e);
+return false;
+}
+if (info == null) {
+return true;
+}

Review comment:
   Do we know when this might happen? My understanding is that part of the 
contract we're trying to test here is that Connect gives back a 404 response 
when a stalled connector is deleted; do we want to relax our expectations to 
also permit this case?

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
##
@@ -596,6 +662,8 @@ public void start(Map props) {
 @Override
 public List poll() {
 block.maybeBlockOn(SOURCE_TASK_POLL);
+// even when not blocking, pause to prevent a tight loop
+Utils.sleep(1000);

Review comment:
   Good call  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-28 Thread GitBox


junrao commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r659914342



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -90,11 +90,63 @@ object LogLoader extends Logging {
*   overflow index offset
*/
   def load(params: LoadLogParams): LoadedLogOffsets = {
-// first do a pass through the files in the log directory and remove any 
temporary files
+
+// First pass: through the files in the log directory and remove any 
temporary files
 // and find any interrupted swap operations
 val swapFiles = removeTempFilesAndCollectSwapFiles(params)
 
-// Now do a second pass and load all the log and index files.
+// The remaining valid swap files must come from compaction or segment 
split operation. We can
+// simply rename them to regular segment files. But, before renaming, we 
should figure out which
+// segments are compacted and delete these segment files: this is done by 
calculating min/maxSwapFileOffset.
+// We store segments that require renaming in this code block, and do the 
actual renaming later.
+var minSwapFileOffset = Long.MaxValue
+var maxSwapFileOffset = Long.MinValue
+val toRenameSwapFiles = mutable.Set[File]()
+swapFiles.filter(f => Log.isLogFile(new 
File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "".foreach { f =>
+  val baseOffset = offsetFromFile(f)
+  val segment = LogSegment.open(f.getParentFile,
+baseOffset = baseOffset,
+params.config,
+time = params.time,
+fileSuffix = Log.SwapFileSuffix)
+  toRenameSwapFiles += f
+  info(s"${params.logIdentifier}Found log file ${f.getPath} from 
interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix} 
files by renaming.")
+  minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset)
+  maxSwapFileOffset = Math.max(segment.offsetIndex.lastOffset, 
maxSwapFileOffset)

Review comment:
   > I could be wrong, but I think if it is compaction, the last record 
will never be removed. The reason is that compaction always removes earlier 
records of each key, and the last record will never be an earlier one.
   > 
   > Split should be similar.
   
   It's true that we generally don't remove the last record during compaction. 
However, during a round of cleaning, we clean segments in groups and each group 
generates a single .clean file. The group is formed to make sure that offsets 
are still within 2 billion in offset gap and the .clean file won't exceed 2GB 
in size. If multiple groups are formed, it's possible that a group that's not 
the last doesn't preserve the last record.
   
   > How can we get the next segment before finishing the recovery process?
   
   We could potentially scan all .log files and sort them in offset order.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12958) Add simulation invariant for leadership and snapshot

2021-06-28 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17370679#comment-17370679
 ] 

Jose Armando Garcia Sancio commented on KAFKA-12958:


Thanks [~zhaohaidao] . I'll try to take a look at your PR today. Not sure if 
this is what is happening but the `KafkaRaftClient` may have multiple 
registered `RaftClient.Listener`. If some of them have not been notify of their 
leadership through `handleLeaderChange` it is okay for them to see calls to 
`handleSnapshot`.

> Add simulation invariant for leadership and snapshot
> 
>
> Key: KAFKA-12958
> URL: https://issues.apache.org/jira/browse/KAFKA-12958
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: HaiyuanZhao
>Priority: Major
> Attachments: image-2021-06-27-02-09-25-296.png, 
> image-2021-06-27-02-15-23-760.png, image-2021-06-27-02-26-48-368.png, 
> image-2021-06-27-02-27-41-966.png
>
>
> During the simulation we should add an invariant that notified leaders are 
> never asked to load snapshots. The state machine always sees the following 
> sequence of callback calls:
> Leaders see:
> ...
> handleLeaderChange state machine is notify of leadership
> handleSnapshot is never called
> Non-leader see:
> ...
> handleLeaderChange state machine is notify that is not leader
> handleSnapshot is called 0 or more times



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

2021-06-28 Thread GitBox


guozhangwang commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r659908813



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
 @SuppressWarnings("unchecked")
 private void emitNonJoinedOuterRecords(final 
WindowStore, LeftOrRightValue> store) {
+if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - 
joinBeforeMs - joinGraceMs) {
+return;
+}
+
 try (final KeyValueIterator>, 
LeftOrRightValue> it = store.all()) {
 while (it.hasNext()) {
 final KeyValue>, 
LeftOrRightValue> record = it.next();
 
 final Windowed> windowedKey = record.key;
 final LeftOrRightValue value = record.value;
+minTime.minTime = windowedKey.window().start();

Review comment:
   Instead of trying to update minTime on each record, could we just set it 
once in line 207 below, plus setting it to MAX if we've exhausted all records 
(as @spena indicated above)?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -69,6 +69,10 @@ public long get() {
 }
 }
 
+static class MinTime {

Review comment:
   Could we merge this and MaxObservedStreamTime into a single class to be 
shared among operator nodes?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10934: [DO NOT MERGE] Scala3 test

2021-06-28 Thread GitBox


jlprat commented on pull request #10934:
URL: https://github.com/apache/kafka/pull/10934#issuecomment-869779558


   > Thanks for the PR. If I understand correctly, this assumes we would drop 
Scala 2.12 compatibility. That makes sense, but it does mean waiting quite a 
while (12+ months).
   
   Shall I try to work on supporting the 3 Scala versions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10760: KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734)

2021-06-28 Thread GitBox


ijuma commented on pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#issuecomment-869776967


   Thanks for the quick investigation!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #10760: KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734)

2021-06-28 Thread GitBox


dajac commented on pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#issuecomment-869772020


   We have found the issue. @thomaskwscott is working on the fix. My bad, I 
missed one case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13002) dev branch Streams not able to fetch end offsets from pre-3.0 brokers

2021-06-28 Thread Tom Scott (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17370653#comment-17370653
 ] 

Tom Scott commented on KAFKA-13002:
---

I think I've determined the problem, I'll get a PR together ASAP

> dev branch Streams not able to fetch end offsets from pre-3.0 brokers
> -
>
> Key: KAFKA-13002
> URL: https://issues.apache.org/jira/browse/KAFKA-13002
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: John Roesler
>Assignee: Tom Scott
>Priority: Blocker
> Fix For: 3.0.0
>
> Attachments: soaks.png
>
>
> Note: this is not a report against a released version of AK. It seems to be a 
> problem on the trunk development branch only.
> After deploying our soak test against `trunk/HEAD` on Friday, I noticed that 
> Streams is no longer processing:
> !soaks.png!
> I found this stacktrace in the logs during startup:
> {code:java}
> 5075 [2021-06-25T16:50:44-05:00] 
> (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) 
> [2021-06-25 21:50:44,499] WARN [i-0691913411e8c77c3-StreamThread-1] The 
> listOffsets request failed. 
> (org.apache.kafka.streams.processor.internals.ClientUtils)
>  5076 [2021-06-25T16:50:44-05:00] 
> (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
> not support LIST_OFFSETS with version in range [7,7].   The supported 
> range is [0,6].
>  5077 at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  5078 at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  5079 at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  5080 at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  5081 at 
> org.apache.kafka.streams.processor.internals.ClientUtils.getEndOffsets(ClientUtils.java:147)
>  5082 at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.populateClientStatesMap(StreamsPartitionAssignor.java:643)
>  5083 at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToClients(StreamsPartitionAssignor.java:579)
>  5084 at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:387)
>  5085 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
>  5086 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
>  5087 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
>  5088 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:593)
>  5089 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:556)
>  5090 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1178)
>  5091 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1153)
>  5092 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
>  5093 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
>  5094 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
>  5095 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
>  5096 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
>  5097 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  5098 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  5099 at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297)
>  5100 at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
>  5101 at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>  5102 at 
> 

[jira] [Assigned] (KAFKA-13002) dev branch Streams not able to fetch end offsets from pre-3.0 brokers

2021-06-28 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-13002:
---

Assignee: Tom Scott  (was: David Jacot)

> dev branch Streams not able to fetch end offsets from pre-3.0 brokers
> -
>
> Key: KAFKA-13002
> URL: https://issues.apache.org/jira/browse/KAFKA-13002
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: John Roesler
>Assignee: Tom Scott
>Priority: Blocker
> Fix For: 3.0.0
>
> Attachments: soaks.png
>
>
> Note: this is not a report against a released version of AK. It seems to be a 
> problem on the trunk development branch only.
> After deploying our soak test against `trunk/HEAD` on Friday, I noticed that 
> Streams is no longer processing:
> !soaks.png!
> I found this stacktrace in the logs during startup:
> {code:java}
> 5075 [2021-06-25T16:50:44-05:00] 
> (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) 
> [2021-06-25 21:50:44,499] WARN [i-0691913411e8c77c3-StreamThread-1] The 
> listOffsets request failed. 
> (org.apache.kafka.streams.processor.internals.ClientUtils)
>  5076 [2021-06-25T16:50:44-05:00] 
> (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
> not support LIST_OFFSETS with version in range [7,7].   The supported 
> range is [0,6].
>  5077 at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  5078 at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  5079 at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  5080 at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  5081 at 
> org.apache.kafka.streams.processor.internals.ClientUtils.getEndOffsets(ClientUtils.java:147)
>  5082 at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.populateClientStatesMap(StreamsPartitionAssignor.java:643)
>  5083 at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToClients(StreamsPartitionAssignor.java:579)
>  5084 at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:387)
>  5085 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
>  5086 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
>  5087 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
>  5088 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:593)
>  5089 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:556)
>  5090 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1178)
>  5091 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1153)
>  5092 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
>  5093 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
>  5094 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
>  5095 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
>  5096 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
>  5097 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  5098 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  5099 at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297)
>  5100 at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
>  5101 at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>  5102 at 
> 

[GitHub] [kafka] guozhangwang commented on pull request #10930: KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs

2021-06-28 Thread GitBox


guozhangwang commented on pull request #10930:
URL: https://github.com/apache/kafka/pull/10930#issuecomment-869764069


   @rajinisivaram I think my confusion comes from `ReplicaFetcherThread throws 
OffsetOutOfRangeException when processing responses with Errors.NONE if the 
leader's offsets in the response are out of range and this moves the partition 
to failed state. ` Could you point me to the code where this is currently 
happening? Also I'm wondering since we are fixing the logic on the leader now, 
if this logic did exist do we still need it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >