This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1dd1e7f KAFKA-10545: Create topic IDs and propagate to brokers (#9626)
1dd1e7f is described below
commit 1dd1e7f945d7a8c1dc177223cd88800680f1ff46
Author: Justine Olshan <[email protected]>
AuthorDate: Fri Dec 18 17:19:50 2020 -0500
KAFKA-10545: Create topic IDs and propagate to brokers (#9626)
This change propagates topic ids to brokers in LeaderAndIsr Request. It
also removes the topic name from the LeaderAndIsr Response, reorganizes the
response to be sorted by topic, and includes the topic ID.
In addition, the topic ID is persisted to each replica in Log as well as in
a file on disk. This file is read on startup and if the topic ID exists, it
will be reloaded.
Reviewers: David Jacot <[email protected]>, dengziming
<[email protected]>, Nikhil Bhatia <[email protected]>, Rajini
Sivaram <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../kafka/common/requests/LeaderAndIsrRequest.java | 57 ++++++--
.../common/requests/LeaderAndIsrResponse.java | 58 +++++++--
.../common/message/LeaderAndIsrRequest.json | 12 +-
.../common/message/LeaderAndIsrResponse.json | 24 +++-
.../common/requests/LeaderAndIsrRequestTest.java | 38 +++++-
.../common/requests/LeaderAndIsrResponseTest.java | 114 ++++++++++++----
.../kafka/common/requests/RequestResponseTest.java | 60 ++++++---
core/src/main/scala/kafka/api/ApiVersion.scala | 2 +-
.../controller/ControllerChannelManager.scala | 10 +-
.../scala/kafka/controller/KafkaController.scala | 7 +-
core/src/main/scala/kafka/log/Log.scala | 21 ++-
.../scala/kafka/server/PartitionMetadataFile.scala | 144 +++++++++++++++++++++
.../main/scala/kafka/server/ReplicaManager.scala | 63 +++++++--
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 2 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 9 +-
.../controller/ControllerChannelManagerTest.scala | 46 +++++--
.../test/scala/unit/kafka/log/LogManagerTest.scala | 8 +-
core/src/test/scala/unit/kafka/log/LogTest.scala | 46 ++++++-
.../kafka/server/BrokerEpochIntegrationTest.scala | 5 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 5 +-
.../unit/kafka/server/LeaderElectionTest.scala | 7 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 136 ++++++++++++++++++-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +
.../unit/kafka/server/ServerShutdownTest.scala | 5 +-
.../apache/kafka/message/MessageDataGenerator.java | 1 -
26 files changed, 760 insertions(+), 124 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index b44e713..8539034 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -100,7 +100,7 @@
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>
<suppress checks="NPathComplexity"
-
files="MemoryRecordsTest|MetricsTest|TestSslUtils|AclAuthorizerBenchmark"/>
+
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>
<suppress
checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 833e025..939212a 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -17,11 +17,13 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrLiveLeader;
import
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrTopicState;
import
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
import org.apache.kafka.common.message.LeaderAndIsrResponseData;
+import
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError;
import
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
@@ -43,12 +45,15 @@ public class LeaderAndIsrRequest extends
AbstractControlRequest {
public static class Builder extends
AbstractControlRequest.Builder<LeaderAndIsrRequest> {
private final List<LeaderAndIsrPartitionState> partitionStates;
+ private final Map<String, Uuid> topicIds;
private final Collection<Node> liveLeaders;
public Builder(short version, int controllerId, int controllerEpoch,
long brokerEpoch,
- List<LeaderAndIsrPartitionState> partitionStates,
Collection<Node> liveLeaders) {
+ List<LeaderAndIsrPartitionState> partitionStates,
Map<String, Uuid> topicIds,
+ Collection<Node> liveLeaders) {
super(ApiKeys.LEADER_AND_ISR, version, controllerId,
controllerEpoch, brokerEpoch);
this.partitionStates = partitionStates;
+ this.topicIds = topicIds;
this.liveLeaders = liveLeaders;
}
@@ -67,7 +72,7 @@ public class LeaderAndIsrRequest extends
AbstractControlRequest {
.setLiveLeaders(leaders);
if (version >= 2) {
- Map<String, LeaderAndIsrTopicState> topicStatesMap =
groupByTopic(partitionStates);
+ Map<String, LeaderAndIsrTopicState> topicStatesMap =
groupByTopic(partitionStates, topicIds);
data.setTopicStates(new ArrayList<>(topicStatesMap.values()));
} else {
data.setUngroupedPartitionStates(partitionStates);
@@ -76,13 +81,14 @@ public class LeaderAndIsrRequest extends
AbstractControlRequest {
return new LeaderAndIsrRequest(data, version);
}
- private static Map<String, LeaderAndIsrTopicState>
groupByTopic(List<LeaderAndIsrPartitionState> partitionStates) {
+ private static Map<String, LeaderAndIsrTopicState>
groupByTopic(List<LeaderAndIsrPartitionState> partitionStates, Map<String,
Uuid> topicIds) {
Map<String, LeaderAndIsrTopicState> topicStates = new HashMap<>();
// We don't null out the topic name in
LeaderAndIsrRequestPartition since it's ignored by
// the generated code if version >= 2
for (LeaderAndIsrPartitionState partition : partitionStates) {
- LeaderAndIsrTopicState topicState =
topicStates.computeIfAbsent(partition.topicName(),
- t -> new
LeaderAndIsrTopicState().setTopicName(partition.topicName()));
+ LeaderAndIsrTopicState topicState =
topicStates.computeIfAbsent(partition.topicName(), t -> new
LeaderAndIsrTopicState()
+ .setTopicName(partition.topicName())
+
.setTopicId(topicIds.getOrDefault(partition.topicName(), Uuid.ZERO_UUID)));
topicState.partitionStates().add(partition);
}
return topicStates;
@@ -96,6 +102,7 @@ public class LeaderAndIsrRequest extends
AbstractControlRequest {
.append(", controllerEpoch=").append(controllerEpoch)
.append(", brokerEpoch=").append(brokerEpoch)
.append(", partitionStates=").append(partitionStates)
+ .append(", topicIds=").append(topicIds)
.append(", liveLeaders=(").append(Utils.join(liveLeaders, ",
")).append(")")
.append(")");
return bld.toString();
@@ -129,15 +136,34 @@ public class LeaderAndIsrRequest extends
AbstractControlRequest {
Errors error = Errors.forException(e);
responseData.setErrorCode(error.code());
- List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
- for (LeaderAndIsrPartitionState partition : partitionStates()) {
- partitions.add(new LeaderAndIsrPartitionError()
- .setTopicName(partition.topicName())
- .setPartitionIndex(partition.partitionIndex())
- .setErrorCode(error.code()));
+ if (version() < 5) {
+ List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
+ for (LeaderAndIsrPartitionState partition : partitionStates()) {
+ partitions.add(new LeaderAndIsrPartitionError()
+ .setTopicName(partition.topicName())
+ .setPartitionIndex(partition.partitionIndex())
+ .setErrorCode(error.code()));
+ }
+ responseData.setPartitionErrors(partitions);
+ return new LeaderAndIsrResponse(responseData, version());
+ }
+
+ List<LeaderAndIsrTopicError> topics = new
ArrayList<>(data.topicStates().size());
+ Map<String, Uuid> topicIds = topicIds();
+ for (LeaderAndIsrTopicState topicState : data.topicStates()) {
+ LeaderAndIsrTopicError topicError = new LeaderAndIsrTopicError();
+ topicError.setTopicId(topicIds.get(topicState.topicName()));
+ List<LeaderAndIsrPartitionError> partitions = new
ArrayList<>(topicState.partitionStates().size());
+ for (LeaderAndIsrPartitionState partition :
topicState.partitionStates()) {
+ partitions.add(new LeaderAndIsrPartitionError()
+ .setPartitionIndex(partition.partitionIndex())
+ .setErrorCode(error.code()));
+ }
+ topicError.setPartitionErrors(partitions);
+ topics.add(topicError);
}
- responseData.setPartitionErrors(partitions);
- return new LeaderAndIsrResponse(responseData);
+ responseData.setTopics(topics);
+ return new LeaderAndIsrResponse(responseData, version());
}
@Override
@@ -162,6 +188,11 @@ public class LeaderAndIsrRequest extends
AbstractControlRequest {
return data.ungroupedPartitionStates();
}
+ public Map<String, Uuid> topicIds() {
+ return data.topicStates().stream()
+ .collect(Collectors.toMap(LeaderAndIsrTopicState::topicName,
LeaderAndIsrTopicState::topicId));
+ }
+
public List<LeaderAndIsrLiveLeader> liveLeaders() {
return Collections.unmodifiableList(data.liveLeaders());
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
index 974dde8..60ab3d5 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -16,15 +16,20 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.LeaderAndIsrResponseData;
+import
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError;
import
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.FlattenedIterator;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
+import java.util.HashMap;
import java.util.Map;
public class LeaderAndIsrResponse extends AbstractResponse {
@@ -36,14 +41,24 @@ public class LeaderAndIsrResponse extends AbstractResponse {
* STALE_BROKER_EPOCH (77)
*/
private final LeaderAndIsrResponseData data;
+ private short version;
- public LeaderAndIsrResponse(LeaderAndIsrResponseData data) {
+ public LeaderAndIsrResponse(LeaderAndIsrResponseData data, short version) {
super(ApiKeys.LEADER_AND_ISR);
this.data = data;
+ this.version = version;
}
- public List<LeaderAndIsrPartitionError> partitions() {
- return data.partitionErrors();
+ public List<LeaderAndIsrTopicError> topics() {
+ return this.data.topics();
+ }
+
+ public Iterable<LeaderAndIsrPartitionError> partitions() {
+ if (version < 5) {
+ return data.partitionErrors();
+ }
+ return () -> new FlattenedIterator<>(data.topics().iterator(),
+ topic -> topic.partitionErrors().iterator());
}
public Errors error() {
@@ -53,22 +68,49 @@ public class LeaderAndIsrResponse extends AbstractResponse {
@Override
public Map<Errors, Integer> errorCounts() {
Errors error = error();
- if (error != Errors.NONE)
+ if (error != Errors.NONE) {
// Minor optimization since the top-level error applies to all
partitions
- return Collections.singletonMap(error,
data.partitionErrors().size() + 1);
- Map<Errors, Integer> errors =
errorCounts(data.partitionErrors().stream().map(l ->
Errors.forCode(l.errorCode())));
- // Top level error
+ if (version < 5)
+ return Collections.singletonMap(error,
data.partitionErrors().size() + 1);
+ return Collections.singletonMap(error,
+ data.topics().stream().mapToInt(t ->
t.partitionErrors().size()).sum() + 1);
+ }
+ Map<Errors, Integer> errors;
+ if (version < 5)
+ errors = errorCounts(data.partitionErrors().stream().map(l ->
Errors.forCode(l.errorCode())));
+ else
+ errors = errorCounts(data.topics().stream().flatMap(t ->
t.partitionErrors().stream()).map(l ->
+ Errors.forCode(l.errorCode())));
updateErrorCounts(errors, Errors.NONE);
return errors;
}
+ public Map<TopicPartition, Errors> partitionErrors(Map<Uuid, String>
topicNames) {
+ Map<TopicPartition, Errors> errors = new HashMap<>();
+ if (version < 5) {
+ data.partitionErrors().forEach(partition ->
+ errors.put(new TopicPartition(partition.topicName(),
partition.partitionIndex()),
+ Errors.forCode(partition.errorCode())));
+ } else {
+ for (LeaderAndIsrTopicError topic : data.topics()) {
+ String topicName = topicNames.get(topic.topicId());
+ if (topicName != null) {
+ topic.partitionErrors().forEach(partition ->
+ errors.put(new TopicPartition(topicName,
partition.partitionIndex()),
+ Errors.forCode(partition.errorCode())));
+ }
+ }
+ }
+ return errors;
+ }
+
@Override
public int throttleTimeMs() {
return DEFAULT_THROTTLE_TIME;
}
public static LeaderAndIsrResponse parse(ByteBuffer buffer, short version)
{
- return new LeaderAndIsrResponse(new LeaderAndIsrResponseData(new
ByteBufferAccessor(buffer), version));
+ return new LeaderAndIsrResponse(new LeaderAndIsrResponseData(new
ByteBufferAccessor(buffer), version), version);
}
@Override
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
index 8529688..129b7f7 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
@@ -21,8 +21,12 @@
//
// Version 2 adds broker epoch and reorganizes the partitions by topic.
//
- // Version 3 adds AddingReplicas and RemovingReplicas
- "validVersions": "0-4",
+ // Version 3 adds AddingReplicas and RemovingReplicas.
+ //
+ // Version 4 is the first flexible version.
+ //
+ // Version 5 adds Topic ID and Type to the TopicStates, as described in
KIP-516.
+ "validVersions": "0-5",
"flexibleVersions": "4+",
"fields": [
{ "name": "ControllerId", "type": "int32", "versions": "0+", "entityType":
"brokerId",
@@ -31,6 +35,8 @@
"about": "The current controller epoch." },
{ "name": "BrokerEpoch", "type": "int64", "versions": "2+", "ignorable":
true, "default": "-1",
"about": "The current broker epoch." },
+ { "name": "Type", "type": "int8", "versions": "5+",
+ "about": "The type that indicates whether all topics are included in the
request"},
{ "name": "UngroupedPartitionStates", "type":
"[]LeaderAndIsrPartitionState", "versions": "0-1",
"about": "The state of each partition, in a v0 or v1 message." },
// In v0 or v1 requests, each partition is listed alongside its topic name.
@@ -40,6 +46,8 @@
"about": "Each topic.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "2+", "entityType":
"topicName",
"about": "The topic name." },
+ { "name": "TopicId", "type": "uuid", "versions": "5+", "ignorable": true,
+ "about": "The unique topic ID." },
{ "name": "PartitionStates", "type": "[]LeaderAndIsrPartitionState",
"versions": "2+",
"about": "The state of each partition" }
]},
diff --git
a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
index 10c3cd9..dc5879b 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
@@ -22,15 +22,29 @@
// Version 2 is the same as version 1.
//
// Version 3 is the same as version 2.
- "validVersions": "0-4",
+ //
+ // Version 4 is the first flexible version.
+ //
+ // Version 5 removes TopicName and replaces it with TopicId and reorganizes
+ // the partitions by topic, as described by KIP-516.
+ "validVersions": "0-5",
"flexibleVersions": "4+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
- { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError",
"versions": "0+",
- "about": "Each partition.", "fields": [
- { "name": "TopicName", "type": "string", "versions": "0+", "entityType":
"topicName",
- "about": "The topic name." },
+ { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError",
"versions": "0-4",
+ "about": "Each partition in v0 to v4 message."},
+ { "name": "Topics", "type": "[]LeaderAndIsrTopicError", "versions": "5+",
+ "about": "Each topic", "fields": [
+ { "name": "TopicId", "type": "uuid", "versions": "5+", "about": "The
unique topic ID" },
+ { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError",
"versions": "5+",
+ "about": "Each partition."}
+ ]}
+ ],
+ "commonStructs": [
+ { "name": "LeaderAndIsrPartitionError", "versions": "0+", "fields": [
+ { "name": "TopicName", "type": "string", "versions": "0-4",
"entityType": "topicName", "ignorable": true,
+ "about": "The topic name."},
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
index 939514e..c45682f 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
@@ -31,8 +32,10 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -50,19 +53,25 @@ public class LeaderAndIsrRequestTest {
public void testUnsupportedVersion() {
LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(
(short) (LEADER_AND_ISR.latestVersion() + 1), 0, 0, 0,
- Collections.emptyList(), Collections.emptySet());
+ Collections.emptyList(), Collections.emptyMap(),
Collections.emptySet());
assertThrows(UnsupportedVersionException.class, builder::build);
}
@Test
public void testGetErrorResponse() {
+ Uuid id = Uuid.randomUuid();
for (short version = LEADER_AND_ISR.oldestVersion(); version <
LEADER_AND_ISR.latestVersion(); version++) {
LeaderAndIsrRequest.Builder builder = new
LeaderAndIsrRequest.Builder(version, 0, 0, 0,
- Collections.emptyList(), Collections.emptySet());
+ Collections.emptyList(), Collections.singletonMap("topic",
id), Collections.emptySet());
LeaderAndIsrRequest request = builder.build();
LeaderAndIsrResponse response = request.getErrorResponse(0,
new ClusterAuthorizationException("Not authorized"));
assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED,
response.error());
+ if (version < 5) {
+ assertEquals(0, response.topics().size());
+ } else {
+ assertEquals(id, response.topics().get(0).topicId());
+ }
}
}
@@ -115,8 +124,13 @@ public class LeaderAndIsrRequestTest {
new Node(0, "host0", 9090),
new Node(1, "host1", 9091)
);
+
+ Map<String, Uuid> topicIds = new HashMap<>();
+ topicIds.put("topic0", Uuid.randomUuid());
+ topicIds.put("topic1", Uuid.randomUuid());
+
LeaderAndIsrRequest request = new
LeaderAndIsrRequest.Builder(version, 1, 2, 3, partitionStates,
- liveNodes).build();
+ topicIds, liveNodes).build();
List<LeaderAndIsrLiveLeader> liveLeaders =
liveNodes.stream().map(n -> new LeaderAndIsrLiveLeader()
.setBrokerId(n.id())
@@ -140,7 +154,21 @@ public class LeaderAndIsrRequestTest {
.setRemovingReplicas(emptyList());
}
+ // Prior to version 2, there were no TopicStates, so a map of
Topic Ids from a list of
+ // TopicStates is an empty map.
+ if (version < 2) {
+ topicIds = new HashMap<>();
+ }
+
+ // In versions 2-4 there are TopicStates, but no topicIds, so
deserialized requests will have
+ // Zero Uuids in place.
+ if (version > 1 && version < 5) {
+ topicIds.put("topic0", Uuid.ZERO_UUID);
+ topicIds.put("topic1", Uuid.ZERO_UUID);
+ }
+
assertEquals(new HashSet<>(partitionStates),
iterableToSet(deserializedRequest.partitionStates()));
+ assertEquals(topicIds, deserializedRequest.topicIds());
assertEquals(liveLeaders, deserializedRequest.liveLeaders());
assertEquals(1, request.controllerId());
assertEquals(2, request.controllerEpoch());
@@ -152,13 +180,15 @@ public class LeaderAndIsrRequestTest {
public void testTopicPartitionGroupingSizeReduction() {
Set<TopicPartition> tps = TestUtils.generateRandomTopicPartitions(10,
10);
List<LeaderAndIsrPartitionState> partitionStates = new ArrayList<>();
+ Map<String, Uuid> topicIds = new HashMap<>();
for (TopicPartition tp : tps) {
partitionStates.add(new LeaderAndIsrPartitionState()
.setTopicName(tp.topic())
.setPartitionIndex(tp.partition()));
+ topicIds.put(tp.topic(), Uuid.randomUuid());
}
LeaderAndIsrRequest.Builder builder = new
LeaderAndIsrRequest.Builder((short) 2, 0, 0, 0,
- partitionStates, Collections.emptySet());
+ partitionStates, topicIds, Collections.emptySet());
LeaderAndIsrRequest v2 = builder.build((short) 2);
LeaderAndIsrRequest v1 = builder.build((short) 1);
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
index fbd7d48..9940e55 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.Uuid;
import
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
import org.apache.kafka.common.message.LeaderAndIsrResponseData;
+import
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError;
import
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@@ -29,6 +31,7 @@ import java.util.List;
import java.util.Map;
import static java.util.Arrays.asList;
+import static org.apache.kafka.common.protocol.ApiKeys.LEADER_AND_ISR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -57,46 +60,87 @@ public class LeaderAndIsrResponseTest {
.setZkVersion(20)
.setReplicas(Collections.singletonList(10))
.setIsNew(false));
+ Map<String, Uuid> topicIds = Collections.singletonMap("foo",
Uuid.randomUuid());
+
LeaderAndIsrRequest request = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(),
- 15, 20, 0, partitionStates, Collections.emptySet()).build();
+ 15, 20, 0, partitionStates, topicIds,
Collections.emptySet()).build();
LeaderAndIsrResponse response = request.getErrorResponse(0,
Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 3),
response.errorCounts());
}
@Test
public void testErrorCountsWithTopLevelError() {
- List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
- asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
- LeaderAndIsrResponse response = new LeaderAndIsrResponse(new
LeaderAndIsrResponseData()
- .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
- .setPartitionErrors(partitions));
- assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 3),
response.errorCounts());
+ for (short version = LEADER_AND_ISR.oldestVersion(); version <
LEADER_AND_ISR.latestVersion(); version++) {
+ LeaderAndIsrResponse response;
+ if (version < 5) {
+ List<LeaderAndIsrPartitionError> partitions =
createPartitions("foo",
+ asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
+ response = new LeaderAndIsrResponse(new
LeaderAndIsrResponseData()
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+ .setPartitionErrors(partitions), version);
+ } else {
+ Uuid id = Uuid.randomUuid();
+ List<LeaderAndIsrTopicError> topics = createTopic(id,
asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
+ response = new LeaderAndIsrResponse(new
LeaderAndIsrResponseData()
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+ .setTopics(topics), version);
+ }
+ assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR,
3), response.errorCounts());
+ }
}
@Test
public void testErrorCountsNoTopLevelError() {
- List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
- asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
- LeaderAndIsrResponse response = new LeaderAndIsrResponse(new
LeaderAndIsrResponseData()
- .setErrorCode(Errors.NONE.code())
- .setPartitionErrors(partitions));
- Map<Errors, Integer> errorCounts = response.errorCounts();
- assertEquals(2, errorCounts.size());
- assertEquals(2, errorCounts.get(Errors.NONE).intValue());
- assertEquals(1,
errorCounts.get(Errors.CLUSTER_AUTHORIZATION_FAILED).intValue());
+ for (short version = LEADER_AND_ISR.oldestVersion(); version <
LEADER_AND_ISR.latestVersion(); version++) {
+ LeaderAndIsrResponse response;
+ if (version < 5) {
+ List<LeaderAndIsrPartitionError> partitions =
createPartitions("foo",
+ asList(Errors.NONE,
Errors.CLUSTER_AUTHORIZATION_FAILED));
+ response = new LeaderAndIsrResponse(new
LeaderAndIsrResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setPartitionErrors(partitions), version);
+ } else {
+ Uuid id = Uuid.randomUuid();
+ List<LeaderAndIsrTopicError> topics = createTopic(id,
asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
+ response = new LeaderAndIsrResponse(new
LeaderAndIsrResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setTopics(topics), version);
+ }
+ Map<Errors, Integer> errorCounts = response.errorCounts();
+ assertEquals(2, errorCounts.size());
+ assertEquals(2, errorCounts.get(Errors.NONE).intValue());
+ assertEquals(1,
errorCounts.get(Errors.CLUSTER_AUTHORIZATION_FAILED).intValue());
+ }
}
@Test
public void testToString() {
- List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
- asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
- LeaderAndIsrResponse response = new LeaderAndIsrResponse(new
LeaderAndIsrResponseData()
- .setErrorCode(Errors.NONE.code())
- .setPartitionErrors(partitions));
- String responseStr = response.toString();
-
assertTrue(responseStr.contains(LeaderAndIsrResponse.class.getSimpleName()));
- assertTrue(responseStr.contains(partitions.toString()));
- assertTrue(responseStr.contains("errorCode=" + Errors.NONE.code()));
+ for (short version = LEADER_AND_ISR.oldestVersion(); version <
LEADER_AND_ISR.latestVersion(); version++) {
+ LeaderAndIsrResponse response;
+ if (version < 5) {
+ List<LeaderAndIsrPartitionError> partitions =
createPartitions("foo",
+ asList(Errors.NONE,
Errors.CLUSTER_AUTHORIZATION_FAILED));
+ response = new LeaderAndIsrResponse(new
LeaderAndIsrResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setPartitionErrors(partitions), version);
+ String responseStr = response.toString();
+
assertTrue(responseStr.contains(LeaderAndIsrResponse.class.getSimpleName()));
+ assertTrue(responseStr.contains(partitions.toString()));
+ assertTrue(responseStr.contains("errorCode=" +
Errors.NONE.code()));
+
+ } else {
+ Uuid id = Uuid.randomUuid();
+ List<LeaderAndIsrTopicError> topics = createTopic(id,
asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
+ response = new LeaderAndIsrResponse(new
LeaderAndIsrResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setTopics(topics), version);
+ String responseStr = response.toString();
+
assertTrue(responseStr.contains(LeaderAndIsrResponse.class.getSimpleName()));
+ assertTrue(responseStr.contains(topics.toString()));
+ assertTrue(responseStr.contains(id.toString()));
+ assertTrue(responseStr.contains("errorCode=" +
Errors.NONE.code()));
+ }
+ }
}
private List<LeaderAndIsrPartitionError> createPartitions(String
topicName, List<Errors> errors) {
@@ -104,11 +148,27 @@ public class LeaderAndIsrResponseTest {
int partitionIndex = 0;
for (Errors error : errors) {
partitions.add(new LeaderAndIsrPartitionError()
- .setTopicName(topicName)
+ .setTopicName(topicName)
+ .setPartitionIndex(partitionIndex++)
+ .setErrorCode(error.code()));
+ }
+ return partitions;
+ }
+
+ private List<LeaderAndIsrTopicError> createTopic(Uuid id, List<Errors>
errors) {
+ List<LeaderAndIsrTopicError> topics = new ArrayList<>();
+ LeaderAndIsrTopicError topic = new LeaderAndIsrTopicError();
+ topic.setTopicId(id);
+ List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
+ int partitionIndex = 0;
+ for (Errors error : errors) {
+ partitions.add(new LeaderAndIsrPartitionError()
.setPartitionIndex(partitionIndex++)
.setErrorCode(error.code()));
}
- return partitions;
+ topic.setPartitionErrors(partitions);
+ topics.add(topic);
+ return topics;
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 8f9a4dc..fdf541c 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
@@ -320,13 +321,12 @@ public class RequestResponseTest {
checkResponse(createStopReplicaResponse(), v, true);
}
- checkRequest(createLeaderAndIsrRequest(0), true);
- checkErrorResponse(createLeaderAndIsrRequest(0),
unknownServerException, false);
- checkRequest(createLeaderAndIsrRequest(1), true);
- checkErrorResponse(createLeaderAndIsrRequest(1),
unknownServerException, false);
- checkRequest(createLeaderAndIsrRequest(2), true);
- checkErrorResponse(createLeaderAndIsrRequest(2),
unknownServerException, false);
- checkResponse(createLeaderAndIsrResponse(), 0, true);
+ for (int v = ApiKeys.LEADER_AND_ISR.oldestVersion(); v <=
ApiKeys.LEADER_AND_ISR.latestVersion(); v++) {
+ checkRequest(createLeaderAndIsrRequest(v), true);
+ checkErrorResponse(createLeaderAndIsrRequest(v),
unknownServerException, false);
+ checkResponse(createLeaderAndIsrResponse(v), v, true);
+ }
+
checkRequest(createSaslHandshakeRequest(), true);
checkErrorResponse(createSaslHandshakeRequest(),
unknownServerException, true);
checkResponse(createSaslHandshakeResponse(), 0, true);
@@ -1550,18 +1550,37 @@ public class RequestResponseTest {
new Node(0, "test0", 1223),
new Node(1, "test1", 1223)
);
- return new LeaderAndIsrRequest.Builder((short) version, 1, 10, 0,
partitionStates, leaders).build();
+
+ Map<String, Uuid> topicIds = new HashMap<>();
+ topicIds.put("topic5", Uuid.randomUuid());
+ topicIds.put("topic20", Uuid.randomUuid());
+
+ return new LeaderAndIsrRequest.Builder((short) version, 1, 10, 0,
+ partitionStates, topicIds, leaders).build();
}
- private LeaderAndIsrResponse createLeaderAndIsrResponse() {
- List<LeaderAndIsrResponseData.LeaderAndIsrPartitionError> partitions =
new ArrayList<>();
- partitions.add(new
LeaderAndIsrResponseData.LeaderAndIsrPartitionError()
- .setTopicName("test")
- .setPartitionIndex(0)
- .setErrorCode(Errors.NONE.code()));
- return new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
- .setErrorCode(Errors.NONE.code())
- .setPartitionErrors(partitions));
+ private LeaderAndIsrResponse createLeaderAndIsrResponse(int version) {
+ if (version < 5) {
+ List<LeaderAndIsrResponseData.LeaderAndIsrPartitionError>
partitions = new ArrayList<>();
+ partitions.add(new
LeaderAndIsrResponseData.LeaderAndIsrPartitionError()
+ .setTopicName("test")
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code()));
+ return new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setPartitionErrors(partitions), (short) version);
+ } else {
+ List<LeaderAndIsrResponseData.LeaderAndIsrPartitionError>
partition = Collections.singletonList(
+ new LeaderAndIsrResponseData.LeaderAndIsrPartitionError()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code()));
+ List<LeaderAndIsrResponseData.LeaderAndIsrTopicError> topics = new
ArrayList<>();
+ topics.add(new LeaderAndIsrResponseData.LeaderAndIsrTopicError()
+ .setTopicId(Uuid.randomUuid())
+ .setPartitionErrors(partition));
+ return new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+ .setTopics(topics), (short) version);
+ }
}
private UpdateMetadataRequest createUpdateMetadataRequest(int version,
String rack) {
@@ -1600,6 +1619,10 @@ public class RequestResponseTest {
.setReplicas(replicas)
.setOfflineReplicas(offlineReplicas));
+ Map<String, Uuid> topicIds = new HashMap<>();
+ topicIds.put("topic5", Uuid.randomUuid());
+ topicIds.put("topic20", Uuid.randomUuid());
+
SecurityProtocol plaintext = SecurityProtocol.PLAINTEXT;
List<UpdateMetadataEndpoint> endpoints1 = new ArrayList<>();
endpoints1.add(new UpdateMetadataEndpoint()
@@ -2541,7 +2564,8 @@ public class RequestResponseTest {
assertEquals(Integer.valueOf(1),
createHeartBeatResponse().errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(1),
createIncrementalAlterConfigsResponse().errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(1),
createJoinGroupResponse(JOIN_GROUP.latestVersion()).errorCounts().get(Errors.NONE));
- assertEquals(Integer.valueOf(2),
createLeaderAndIsrResponse().errorCounts().get(Errors.NONE));
+ assertEquals(Integer.valueOf(2),
createLeaderAndIsrResponse(4).errorCounts().get(Errors.NONE));
+ assertEquals(Integer.valueOf(2),
createLeaderAndIsrResponse(5).errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(3),
createLeaderEpochResponse().errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(1),
createLeaveGroupResponse().errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(1),
createListGroupsResponse(LIST_GROUPS.latestVersion()).errorCounts().get(Errors.NONE));
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala
b/core/src/main/scala/kafka/api/ApiVersion.scala
index a019032..c859f8d 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -111,7 +111,7 @@ object ApiVersion {
KAFKA_2_7_IV2,
// Flexible versioning on ListOffsets, WriteTxnMarkers and
OffsetsForLeaderEpoch.
KAFKA_2_8_IV0,
- // Add topicId to MetadataUpdateRequest
+ // Introduced topic IDs to LeaderAndIsr and UpdateMetadata
requests/responses (KIP-516)
KAFKA_2_8_IV1
)
diff --git
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b716552..21a445b 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -455,7 +455,8 @@ abstract class AbstractControllerBrokerRequestBatch(config:
KafkaConfig,
private def sendLeaderAndIsrRequest(controllerEpoch: Int, stateChangeLog:
StateChangeLogger): Unit = {
val leaderAndIsrRequestVersion: Short =
- if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 4
+ if (config.interBrokerProtocolVersion >= KAFKA_2_8_IV1) 5
+ else if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 4
else if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV0) 3
else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 2
else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
@@ -482,8 +483,13 @@ abstract class
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
_.node(config.interBrokerListenerName)
}
val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(broker)
+ val topicIds = leaderAndIsrPartitionStates.keys
+ .map(_.topic)
+ .toSet[String]
+ .map(topic => (topic, controllerContext.topicIds(topic)))
+ .toMap
val leaderAndIsrRequestBuilder = new
LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId,
- controllerEpoch, brokerEpoch,
leaderAndIsrPartitionStates.values.toBuffer.asJava, leaders.asJava)
+ controllerEpoch, brokerEpoch,
leaderAndIsrPartitionStates.values.toBuffer.asJava, topicIds.asJava,
leaders.asJava)
sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse)
=> {
val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse]
sendEvent(LeaderAndIsrResponseReceived(leaderAndIsrResponse, broker))
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index fe14d42..b382fd9 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1378,11 +1378,10 @@ class KafkaController(val config: KafkaConfig,
val offlineReplicas = new ArrayBuffer[TopicPartition]()
val onlineReplicas = new ArrayBuffer[TopicPartition]()
- leaderAndIsrResponse.partitions.forEach { partition =>
- val tp = new TopicPartition(partition.topicName,
partition.partitionIndex)
- if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
+
leaderAndIsrResponse.partitionErrors(controllerContext.topicNames.asJava).forEach{
case (tp, error) =>
+ if (error.code() == Errors.KAFKA_STORAGE_ERROR.code)
offlineReplicas += tp
- else if (partition.errorCode == Errors.NONE.code)
+ else if (error.code() == Errors.NONE.code)
onlineReplicas += tp
}
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 438f234..47d6b92 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -32,7 +32,7 @@ import kafka.message.{BrokerCompressionCodec,
CompressionCodec, NoCompressionCod
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.LeaderEpochFileCache
-import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark,
FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel,
LogOffsetMetadata, OffsetAndEpoch}
+import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark,
FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel,
LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile}
import kafka.utils._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.message.FetchResponseData
@@ -43,7 +43,7 @@ import org.apache.kafka.common.requests.ListOffsetsRequest
import
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET
import org.apache.kafka.common.requests.ProduceResponse.RecordError
import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{InvalidRecordException, KafkaException,
TopicPartition}
+import org.apache.kafka.common.{InvalidRecordException, KafkaException,
TopicPartition, Uuid}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
@@ -296,11 +296,16 @@ class Log(@volatile private var _dir: File,
// Visible for testing
@volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
+ @volatile var partitionMetadataFile : Option[PartitionMetadataFile] = None
+
+ @volatile var topicId : Uuid = Uuid.ZERO_UUID
+
locally {
// create the log directory if it doesn't exist
Files.createDirectories(dir.toPath)
initializeLeaderEpochCache()
+ initializePartitionMetadata()
val nextOffset = loadSegments()
@@ -324,6 +329,12 @@ class Log(@volatile private var _dir: File,
// deletion.
producerStateManager.removeStraySnapshots(segments.values().asScala.map(_.baseOffset).toSeq)
loadProducerState(logEndOffset, reloadFromCleanShutdown = hadCleanShutdown)
+
+ // Recover topic ID if present
+ partitionMetadataFile.foreach { file =>
+ if (!file.isEmpty())
+ topicId = file.read().topicId
+ }
}
def dir: File = _dir
@@ -536,6 +547,11 @@ class Log(@volatile private var _dir: File,
private def recordVersion: RecordVersion =
config.messageFormatVersion.recordVersion
+ private def initializePartitionMetadata(): Unit = lock synchronized {
+ val partitionMetadata = PartitionMetadataFile.newFile(dir)
+ partitionMetadataFile = Some(new PartitionMetadataFile(partitionMetadata,
logDirFailureChannel))
+ }
+
private def initializeLeaderEpochCache(): Unit = lock synchronized {
val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
@@ -1003,6 +1019,7 @@ class Log(@volatile private var _dir: File,
// re-initialize leader epoch cache so that
LeaderEpochCheckpointFile.checkpoint can correctly reference
// the checkpoint file in renamed log directory
initializeLeaderEpochCache()
+ initializePartitionMetadata()
}
}
}
diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
new file mode 100644
index 0000000..1adcbc3
--- /dev/null
+++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.{BufferedReader, BufferedWriter, File, FileOutputStream,
IOException, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{FileAlreadyExistsException, Files, Paths}
+import java.util.regex.Pattern
+
+import kafka.utils.Logging
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.utils.Utils
+
+
+
+object PartitionMetadataFile {
+ private val PartitionMetadataFilename = "partition.metadata"
+ private val WhiteSpacesPattern = Pattern.compile(":\\s+")
+ private val CurrentVersion = 0
+
+ def newFile(dir: File): File = new File(dir, PartitionMetadataFilename)
+
+ object PartitionMetadataFileFormatter {
+ def toFile(data: PartitionMetadata): String = {
+ s"version: ${data.version}\ntopic_id: ${data.topicId}"
+ }
+
+ }
+
+ class PartitionMetadataReadBuffer[T](location: String,
+ reader: BufferedReader,
+ version: Int) extends Logging {
+ def read(): PartitionMetadata = {
+ def malformedLineException(line: String) =
+ new IOException(s"Malformed line in checkpoint file ($location):
'$line'")
+
+ var line: String = null
+ var metadataTopicId: Uuid = null
+ try {
+ line = reader.readLine()
+ WhiteSpacesPattern.split(line) match {
+ case Array(_, version) =>
+ if (version.toInt == CurrentVersion) {
+ line = reader.readLine()
+ WhiteSpacesPattern.split(line) match {
+ case Array(_, topicId) => metadataTopicId =
Uuid.fromString(topicId)
+ case _ => throw malformedLineException(line)
+ }
+ if (metadataTopicId.equals(Uuid.ZERO_UUID)) {
+ throw new IOException(s"Invalid topic ID in partition metadata
file ($location)")
+ }
+ new PartitionMetadata(CurrentVersion, metadataTopicId)
+ } else {
+ throw new IOException(s"Unrecognized version of partition
metadata file ($location): " + version)
+ }
+ case _ => throw malformedLineException(line)
+ }
+ } catch {
+ case _: NumberFormatException => throw malformedLineException(line)
+ }
+ }
+ }
+
+}
+
+class PartitionMetadata(val version: Int, val topicId: Uuid)
+
+
+class PartitionMetadataFile(val file: File,
+ logDirFailureChannel: LogDirFailureChannel)
extends Logging {
+ import kafka.server.PartitionMetadataFile.{CurrentVersion,
PartitionMetadataFileFormatter, PartitionMetadataReadBuffer}
+
+ private val path = file.toPath.toAbsolutePath
+ private val tempPath = Paths.get(path.toString + ".tmp")
+ private val lock = new Object()
+ private val logDir = file.getParentFile.getParent
+
+
+ try Files.createFile(file.toPath) // create the file if it doesn't exist
+ catch { case _: FileAlreadyExistsException => }
+
+ def write(topicId: Uuid): Unit = {
+ lock synchronized {
+ try {
+ // write to temp file and then swap with the existing file
+ val fileOutputStream = new FileOutputStream(tempPath.toFile)
+ val writer = new BufferedWriter(new
OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
+ try {
+ writer.write(PartitionMetadataFileFormatter.toFile(new
PartitionMetadata(CurrentVersion,topicId)))
+ writer.flush()
+ fileOutputStream.getFD().sync()
+ } finally {
+ writer.close()
+ }
+
+ Utils.atomicMoveWithFallback(tempPath, path)
+ } catch {
+ case e: IOException =>
+ val msg = s"Error while writing to partition metadata file
${file.getAbsolutePath}"
+ logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
+ throw new KafkaStorageException(msg, e)
+ }
+ }
+ }
+
+ def read(): PartitionMetadata = {
+ lock synchronized {
+ try {
+ val reader = Files.newBufferedReader(path)
+ try {
+ val partitionBuffer = new
PartitionMetadataReadBuffer(file.getAbsolutePath, reader, CurrentVersion)
+ partitionBuffer.read()
+ } finally {
+ reader.close()
+ }
+ } catch {
+ case e: IOException =>
+ val msg = s"Error while reading partition metadata file
${file.getAbsolutePath}"
+ logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
+ throw new KafkaStorageException(msg, e)
+ }
+ }
+ }
+
+ def isEmpty(): Boolean = {
+ file.length() == 0
+ }
+}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index ea44021..a3934a5 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -36,12 +36,13 @@ import kafka.server.checkpoints.{LazyOffsetCheckpoints,
OffsetCheckpointFile, Of
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.{ElectionType, IsolationLevel, Node,
TopicPartition}
+import org.apache.kafka.common.{ElectionType, IsolationLevel, Node,
TopicPartition, Uuid}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
import org.apache.kafka.common.message.{DescribeLogDirsResponseData,
FetchResponseData, LeaderAndIsrResponseData}
+import
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError
import
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic}
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{OffsetForLeaderTopicResult,
EpochEndOffset}
@@ -1331,6 +1332,7 @@ class ReplicaManager(val config: KafkaConfig,
s"correlation id $correlationId from controller $controllerId " +
s"epoch ${leaderAndIsrRequest.controllerEpoch}")
}
+ val topicIds = leaderAndIsrRequest.topicIds()
val response = {
if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
@@ -1437,6 +1439,24 @@ class ReplicaManager(val config: KafkaConfig,
*/
if (localLog(topicPartition).isEmpty)
markPartitionOffline(topicPartition)
+ else {
+ val id = topicIds.get(topicPartition.topic())
+ // Ensure we have not received a request from an older protocol
+ if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+ val log = localLog(topicPartition).get
+ // Check if topic ID is in memory, if not, it must be new to
the broker and does not have a metadata file.
+ // This is because if the broker previously wrote it to file,
it would be recovered on restart after failure.
+ if (log.topicId.equals(Uuid.ZERO_UUID)) {
+ log.partitionMetadataFile.get.write(id)
+ log.topicId = id
+ // Warn if the topic ID in the request does not match the
log.
+ } else if (!log.topicId.equals(id)) {
+ stateChangeLogger.warn(s"Topic Id in memory:
${log.topicId.toString} does not" +
+ s" match the topic Id provided in the request: " +
+ s"${id.toString}.")
+ }
+ }
+ }
}
// we initialize highwatermark thread after the first
leaderisrrequest. This ensures that all the partitions
@@ -1448,15 +1468,38 @@ class ReplicaManager(val config: KafkaConfig,
replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
- val responsePartitions = responseMap.iterator.map { case (tp, error)
=>
- new LeaderAndIsrPartitionError()
- .setTopicName(tp.topic)
- .setPartitionIndex(tp.partition)
- .setErrorCode(error.code)
- }.toBuffer
- new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
- .setErrorCode(Errors.NONE.code)
- .setPartitionErrors(responsePartitions.asJava))
+ if (leaderAndIsrRequest.version() < 5) {
+ val responsePartitions = responseMap.iterator.map { case (tp,
error) =>
+ new LeaderAndIsrPartitionError()
+ .setTopicName(tp.topic)
+ .setPartitionIndex(tp.partition)
+ .setErrorCode(error.code)
+ }.toBuffer
+ new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setPartitionErrors(responsePartitions.asJava),
leaderAndIsrRequest.version())
+ } else {
+ val topics = new mutable.HashMap[String,
List[LeaderAndIsrPartitionError]]
+ responseMap.asJava.forEach { case (tp, error) =>
+ if (!topics.contains(tp.topic)) {
+ topics.put(tp.topic, List(new LeaderAndIsrPartitionError()
+
.setPartitionIndex(tp.partition)
+
.setErrorCode(error.code)))
+ } else {
+ topics.put(tp.topic, new LeaderAndIsrPartitionError()
+ .setPartitionIndex(tp.partition)
+ .setErrorCode(error.code)::topics(tp.topic))
+ }
+ }
+ val topicErrors = topics.iterator.map { case (topic,
partitionError) =>
+ new LeaderAndIsrTopicError()
+ .setTopicId(topicIds.get(topic))
+ .setPartitionErrors(partitionError.asJava)
+ }.toBuffer
+ new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setTopics(topicErrors.asJava), leaderAndIsrRequest.version())
+ }
}
}
val endMs = time.milliseconds()
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index c8122d7..7f84dab 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -507,7 +507,7 @@ class KafkaZkClient private[zk] (zooKeeperClient:
ZooKeeperClient, isSecure: Boo
/**
* Sets the topic znode with the given assignment.
* @param topic the topic whose assignment is being set.
- * @param topicId optional topic ID if the topic has one
+ * @param topicId unique topic ID for the topic
* @param assignment the partition to replica mapping to set for the given
topic
* @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
* @return SetDataResponse
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 11faf35..0ca1e17 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -56,7 +56,7 @@ import org.apache.kafka.common.resource.PatternType.{LITERAL,
PREFIXED}
import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.{PatternType, Resource,
ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
-import org.apache.kafka.common.{ElectionType, IsolationLevel, Node,
TopicPartition, requests}
+import org.apache.kafka.common.{ElectionType, IsolationLevel, Node,
TopicPartition, requests, Uuid}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -99,6 +99,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val brokerId: Integer = 0
val topic = "topic"
+ val topicId = Uuid.randomUuid()
val topicPattern = "topic.*"
val transactionalId = "transactional.id"
val producerId = 83392L
@@ -106,6 +107,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val correlationId = 0
val clientId = "client-Id"
val tp = new TopicPartition(topic, part)
+ val topicIds = Collections.singletonMap(topic, topicId)
+ val topicNames = Collections.singletonMap(topicId, topic)
val logDir = "logDir"
val group = "my-group"
val protocolType = "consumer"
@@ -181,7 +184,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error),
ApiKeys.DELETE_GROUPS -> ((resp: DeleteGroupsResponse) => resp.get(group)),
ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) =>
Errors.forCode(
- resp.partitions.asScala.find(p => p.topicName == tp.topic &&
p.partitionIndex == tp.partition).get.errorCode)),
+ resp.topics.asScala.find(t => topicNames.get(t.topicId) ==
tp.topic).get.partitionErrors.asScala.find(
+ p => p.partitionIndex == tp.partition).get.errorCode)),
ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) =>
Errors.forCode(
resp.partitionErrors.asScala.find(pe => pe.topicName == tp.topic &&
pe.partitionIndex == tp.partition).get.errorCode)),
ApiKeys.CONTROLLED_SHUTDOWN -> ((resp:
requests.ControlledShutdownResponse) => resp.error),
@@ -474,6 +478,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
.setZkVersion(2)
.setReplicas(Seq(brokerId).asJava)
.setIsNew(false)).asJava,
+ topicIds,
Set(new Node(brokerId, "localhost", 0)).asJava).build()
}
diff --git
a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
index 837ac9e..7a4d0ba 100644
---
a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
@@ -30,6 +30,7 @@ import
org.apache.kafka.common.message.StopReplicaResponseData.StopReplicaPartit
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractControlRequest,
AbstractResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
StopReplicaRequest, StopReplicaResponse, UpdateMetadataRequest,
UpdateMetadataResponse}
+import
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.Assert._
import org.junit.Test
@@ -72,6 +73,8 @@ class ControllerChannelManagerTest {
assertEquals(1, updateMetadataRequests.size)
val leaderAndIsrRequest = leaderAndIsrRequests.head
+ val topicIds = leaderAndIsrRequest.topicIds();
+ val topicNames = topicIds.asScala.map { case (k, v) => (v, k) }
assertEquals(controllerId, leaderAndIsrRequest.controllerId)
assertEquals(controllerEpoch, leaderAndIsrRequest.controllerEpoch)
assertEquals(partitions.keySet,
@@ -87,7 +90,10 @@ class ControllerChannelManagerTest {
val LeaderAndIsrResponseReceived(leaderAndIsrResponse, brokerId) =
batch.sentEvents.head
assertEquals(2, brokerId)
assertEquals(partitions.keySet,
- leaderAndIsrResponse.partitions.asScala.map(p => new
TopicPartition(p.topicName, p.partitionIndex)).toSet)
+ leaderAndIsrResponse.topics.asScala.flatMap(t =>
t.partitionErrors.asScala.map(p =>
+ new TopicPartition(topicNames(t.topicId), p.partitionIndex))).toSet)
+ leaderAndIsrResponse.topics.forEach(topic =>
+ assertEquals(topicIds.get(topicNames.get(topic.topicId).get),
topic.topicId))
}
@Test
@@ -157,7 +163,8 @@ class ControllerChannelManagerTest {
for (apiVersion <- ApiVersion.allVersions) {
val leaderAndIsrRequestVersion: Short =
- if (apiVersion >= KAFKA_2_4_IV1) 4
+ if (apiVersion >= KAFKA_2_8_IV1) 5
+ else if (apiVersion >= KAFKA_2_4_IV1) 4
else if (apiVersion >= KAFKA_2_4_IV0) 3
else if (apiVersion >= KAFKA_2_2_IV0) 2
else if (apiVersion >= KAFKA_1_0_IV0) 1
@@ -187,6 +194,21 @@ class ControllerChannelManagerTest {
assertEquals(1, leaderAndIsrRequests.size)
assertEquals(s"IBP $interBrokerProtocolVersion should use version
$expectedLeaderAndIsrVersion",
expectedLeaderAndIsrVersion, leaderAndIsrRequests.head.version)
+
+ val request = leaderAndIsrRequests.head
+ val byteBuffer = request.serialize
+ val deserializedRequest = LeaderAndIsrRequest.parse(byteBuffer,
expectedLeaderAndIsrVersion)
+
+ if (interBrokerProtocolVersion >= KAFKA_2_8_IV1) {
+ assertTrue(!request.topicIds().get("foo").equals(Uuid.ZERO_UUID))
+
assertTrue(!deserializedRequest.topicIds().get("foo").equals(Uuid.ZERO_UUID))
+ } else if (interBrokerProtocolVersion >= KAFKA_2_2_IV0) {
+ assertTrue(!request.topicIds().get("foo").equals(Uuid.ZERO_UUID))
+
assertTrue(deserializedRequest.topicIds().get("foo").equals(Uuid.ZERO_UUID))
+ } else {
+ assertTrue(request.topicIds().get("foo") == null)
+ assertTrue(deserializedRequest.topicIds().get("foo") == null)
+ }
}
@Test
@@ -827,15 +849,18 @@ class ControllerChannelManagerTest {
private def applyLeaderAndIsrResponseCallbacks(error: Errors, sentRequests:
List[SentRequest]): Unit = {
sentRequests.filter(_.request.apiKey ==
ApiKeys.LEADER_AND_ISR).filter(_.responseCallback != null).foreach {
sentRequest =>
val leaderAndIsrRequest =
sentRequest.request.build().asInstanceOf[LeaderAndIsrRequest]
- val partitionErrors = leaderAndIsrRequest.partitionStates.asScala.map(p
=>
- new LeaderAndIsrPartitionError()
- .setTopicName(p.topicName)
- .setPartitionIndex(p.partitionIndex)
- .setErrorCode(error.code))
+ val topicIds = leaderAndIsrRequest.topicIds
+ val topicErrors = leaderAndIsrRequest.data.topicStates.asScala.map(t =>
+ new LeaderAndIsrTopicError()
+ .setTopicId(topicIds.get(t.topicName))
+ .setPartitionErrors(t.partitionStates.asScala.map(p =>
+ new LeaderAndIsrPartitionError()
+ .setPartitionIndex(p.partitionIndex)
+ .setErrorCode(error.code)).asJava))
val leaderAndIsrResponse = new LeaderAndIsrResponse(
new LeaderAndIsrResponseData()
.setErrorCode(error.code)
- .setPartitionErrors(partitionErrors.toBuffer.asJava))
+ .setTopics(topicErrors.toBuffer.asJava),
leaderAndIsrRequest.version())
sentRequest.responseCallback(leaderAndIsrResponse)
}
}
@@ -871,6 +896,11 @@ class ControllerChannelManagerTest {
}.toMap
context.setLiveBrokers(brokerEpochs)
+ context.setAllTopics(topics)
+
+ for (topic <- topics) {
+ context.addTopicId(topic, Uuid.randomUuid())
+ }
// Simple round-robin replica assignment
var leaderIndex = 0
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index a13bedc..031000d 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -228,8 +228,8 @@ class LogManagerTest {
s.lazyTimeIndex.get
})
- // there should be a log file, two indexes, one producer snapshot, and the
leader epoch checkpoint
- assertEquals("Files should have been deleted", log.numberOfSegments * 4 +
1, log.dir.list.length)
+ // there should be a log file, two indexes, one producer snapshot,
partition metadata, and the leader epoch checkpoint
+ assertEquals("Files should have been deleted", log.numberOfSegments * 4 +
2, log.dir.list.length)
assertEquals("Should get empty fetch off new log.", 0, readLog(log, offset
+ 1).records.sizeInBytes)
try {
@@ -278,8 +278,8 @@ class LogManagerTest {
time.sleep(log.config.fileDeleteDelayMs + 1)
// there should be a log file, two indexes (the txn index is created
lazily),
- // and a producer snapshot file per segment, and the leader epoch
checkpoint.
- assertEquals("Files should have been deleted", log.numberOfSegments * 4 +
1, log.dir.list.length)
+ // and a producer snapshot file per segment, and the leader epoch
checkpoint and partition metadata file.
+ assertEquals("Files should have been deleted", log.numberOfSegments * 4 +
2, log.dir.list.length)
assertEquals("Should get empty fetch off new log.", 0, readLog(log, offset
+ 1).records.sizeInBytes)
try {
readLog(log, 0)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index ce52c6b..b107c21 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -30,9 +30,9 @@ import kafka.log.Log.DeleteDirSuffix
import kafka.metrics.KafkaYammerMetrics
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
-import kafka.server.{BrokerState, BrokerTopicStats, FetchDataInfo,
FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted,
KafkaConfig, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.server.{BrokerState, BrokerTopicStats, FetchDataInfo,
FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted,
KafkaConfig, LogDirFailureChannel, LogOffsetMetadata, PartitionMetadataFile}
import kafka.utils._
-import org.apache.kafka.common.{InvalidRecordException, KafkaException,
TopicPartition}
+import org.apache.kafka.common.{InvalidRecordException, KafkaException,
TopicPartition, Uuid}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
@@ -2372,6 +2372,21 @@ class LogTest {
log.close()
}
+ @Test
+ def testLogRecoversTopicId(): Unit = {
+ val logConfig = LogTest.createLogConfig()
+ var log = createLog(logDir, logConfig)
+
+ val topicId = Uuid.randomUuid()
+ log.partitionMetadataFile.get.write(topicId)
+ log.close()
+
+ // test recovery case
+ log = createLog(logDir, logConfig)
+ assertTrue(log.topicId == topicId)
+ log.close()
+ }
+
/**
* Test building the time index on the follower by setting assignOffsets to
false.
*/
@@ -2907,6 +2922,33 @@ class LogTest {
}
@Test
+ def testTopicIdTransfersAfterDirectoryRename(): Unit = {
+ val logConfig = LogTest.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+ val log = createLog(logDir, logConfig)
+
+ // Write a topic ID to the partition metadata file to ensure it is
transferred correctly.
+ val id = Uuid.randomUuid()
+ log.topicId = id
+ log.partitionMetadataFile.get.write(id)
+
+ log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
+ assertEquals(Some(5), log.latestEpoch)
+
+ // Ensure that after a directory rename, the partition metadata file is
written to the right location.
+ val tp = Log.parseTopicPartitionName(log.dir)
+ log.renameDir(Log.logDeleteDirName(tp))
+ log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("foo".getBytes()))), leaderEpoch = 10)
+ assertEquals(Some(10), log.latestEpoch)
+ assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
+ assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())
+
+ // Check the topic ID remains in memory and was copied correctly.
+ assertEquals(id, log.topicId)
+ assertTrue(!log.partitionMetadataFile.isEmpty)
+ assertEquals(id, log.partitionMetadataFile.get.read().topicId)
+ }
+
+ @Test
def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
val log = createLog(logDir, logConfig)
diff --git
a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
index be8766c..e733909 100755
--- a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
@@ -25,7 +25,7 @@ import kafka.controller.{ControllerChannelManager,
ControllerContext, StateChang
import kafka.utils.TestUtils
import kafka.utils.TestUtils.createTopic
import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicPartition, Uuid}
import
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import
org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState,
StopReplicaTopicState}
import
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
UpdateMetadataEndpoint, UpdateMetadataPartitionState}
@@ -112,6 +112,7 @@ class BrokerEpochIntegrationTest extends
ZooKeeperTestHarness {
private def
testControlRequestWithBrokerEpoch(epochInRequestDiffFromCurrentEpoch: Long):
Unit = {
val tp = new TopicPartition("new-topic", 0)
+ val topicIds = Collections.singletonMap("new-topic", Uuid.randomUuid)
// create topic with 1 partition, 2 replicas, one on each broker
createTopic(zkClient, tp.topic(), partitionReplicaAssignment = Map(0 ->
Seq(brokerId1, brokerId2)), servers = servers)
@@ -155,7 +156,7 @@ class BrokerEpochIntegrationTest extends
ZooKeeperTestHarness {
val requestBuilder = new LeaderAndIsrRequest.Builder(
ApiKeys.LEADER_AND_ISR.latestVersion, controllerId, controllerEpoch,
epochInRequest,
- partitionStates.asJava, nodes.toSet.asJava)
+ partitionStates.asJava, topicIds, nodes.toSet.asJava)
if (epochInRequestDiffFromCurrentEpoch < 0) {
// stale broker epoch in LEADER_AND_ISR
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e37d93b..87a870a 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -65,7 +65,7 @@ import org.apache.kafka.common.requests.{FetchMetadata =>
JFetchMetadata, _}
import org.apache.kafka.common.resource.{PatternType, Resource,
ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.ProducerIdAndEpoch
-import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition}
+import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult,
Authorizer}
import org.easymock.EasyMock._
import org.easymock.{Capture, EasyMock, IAnswer, IArgumentMatcher}
@@ -2681,12 +2681,13 @@ class KafkaApisTest {
controllerEpoch,
brokerEpochInRequest,
partitionStates,
+ Collections.singletonMap("topicW", Uuid.randomUuid()),
asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
).build()
val request = buildRequest(leaderAndIsrRequest)
val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code)
- .setPartitionErrors(asList()))
+ .setPartitionErrors(asList()), leaderAndIsrRequest.version())
EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
EasyMock.expect(replicaManager.becomeLeaderOrFollower(
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index a3eb5d7..fa0b940 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -17,7 +17,9 @@
package kafka.server
-import org.apache.kafka.common.TopicPartition
+import java.util.Collections
+
+import org.apache.kafka.common.{TopicPartition, Uuid}
import scala.jdk.CollectionConverters._
import kafka.api.LeaderAndIsr
@@ -155,7 +157,8 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
)
val requestBuilder = new LeaderAndIsrRequest.Builder(
ApiKeys.LEADER_AND_ISR.latestVersion, controllerId,
staleControllerEpoch,
- servers(brokerId2).kafkaController.brokerEpoch,
partitionStates.asJava, nodes.toSet.asJava)
+ servers(brokerId2).kafkaController.brokerEpoch, partitionStates.asJava,
+ Collections.singletonMap(topic, Uuid.randomUuid()), nodes.toSet.asJava)
controllerChannelManager.sendRequest(brokerId2, requestBuilder,
staleControllerEpochCallback)
TestUtils.waitUntilTrue(() => staleControllerEpochDetected, "Controller
epoch should be stale")
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index f73223e..80fdc60 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -21,7 +21,7 @@ import java.io.File
import java.net.InetAddress
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.{Optional, Properties}
+import java.util.{Collections, Optional, Properties}
import kafka.api._
import kafka.log.{AppendOrigin, Log, LogConfig, LogManager,
ProducerStateManager}
@@ -50,7 +50,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition}
+import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -174,6 +174,7 @@ class ReplicaManagerTest {
try {
val brokerList = Seq[Integer](0, 1).asJava
+ val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
val partition = rm.createPartition(new TopicPartition(topic, 0))
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
@@ -190,6 +191,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
+ topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
rm.getPartitionOrException(new TopicPartition(topic, 0))
@@ -212,6 +214,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
+ topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
@@ -236,6 +239,7 @@ class ReplicaManagerTest {
replicaManager.createPartition(topicPartition)
.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+ val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
@@ -248,6 +252,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
+ topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _)
=> ())
@@ -307,6 +312,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
+ Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) =>
())
replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
@@ -367,6 +373,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
+ Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) =>
())
replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
@@ -473,6 +480,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
+ Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) =>
())
replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
@@ -549,6 +557,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
+ Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2,
"host2", 2)).asJava).build()
rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
rm.getPartitionOrException(new TopicPartition(topic, 0))
@@ -605,6 +614,7 @@ class ReplicaManagerTest {
.setIsNew(true)
val leaderAndIsrRequest = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
Seq(leaderAndIsrPartitionState).asJava,
+ Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest, (_, _) => ())
assertEquals(Errors.NONE, leaderAndIsrResponse.error)
@@ -696,6 +706,7 @@ class ReplicaManagerTest {
replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val partition1Replicas = Seq[Integer](0, 2).asJava
+ val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic ->
Uuid.randomUuid()).asJava
val leaderAndIsrRequest = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
Seq(
new LeaderAndIsrPartitionState()
@@ -719,6 +730,7 @@ class ReplicaManagerTest {
.setReplicas(partition1Replicas)
.setIsNew(true)
).asJava,
+ topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) =>
())
@@ -832,6 +844,7 @@ class ReplicaManagerTest {
val leaderAndIsrRequest0 = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
controllerId, controllerEpoch, brokerEpoch,
Seq(leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId,
aliveBrokerIds)).asJava,
+ Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(followerBrokerId, "host1", 0),
new Node(leaderBrokerId, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest0,
@@ -908,6 +921,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
+ Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) =>
())
@@ -957,6 +971,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
+ Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) =>
())
@@ -1007,6 +1022,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
+ Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) =>
())
@@ -1088,6 +1104,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
+ Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) =>
())
@@ -1128,6 +1145,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
+ Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
@@ -1161,6 +1179,7 @@ class ReplicaManagerTest {
val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
+ val topicIds = Collections.singletonMap(tp0.topic, Uuid.randomUuid())
val becomeLeaderRequest = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
@@ -1173,6 +1192,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
+ topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
@@ -1193,6 +1213,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
+ topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) =>
())
@@ -1209,6 +1230,7 @@ class ReplicaManagerTest {
val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
+ val topicIds = Collections.singletonMap(tp0.topic, Uuid.randomUuid())
val becomeLeaderRequest = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
@@ -1221,6 +1243,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
+ topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
@@ -1242,6 +1265,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
+ topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) =>
())
@@ -1269,6 +1293,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
+ Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
@@ -1311,6 +1336,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
+ Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
@@ -1354,6 +1380,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
+ Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
@@ -1732,6 +1759,7 @@ class ReplicaManagerTest {
val tp1 = new TopicPartition(topic, 1)
val partition0Replicas = Seq[Integer](0, 1).asJava
val partition1Replicas = Seq[Integer](1, 0).asJava
+ val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic ->
Uuid.randomUuid()).asJava
val leaderAndIsrRequest1 = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
controllerId, 0, brokerEpoch,
@@ -1757,6 +1785,7 @@ class ReplicaManagerTest {
.setReplicas(partition1Replicas)
.setIsNew(true)
).asJava,
+ topicIds,
Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _)
=> ())
@@ -1787,6 +1816,7 @@ class ReplicaManagerTest {
.setReplicas(partition1Replicas)
.setIsNew(true)
).asJava,
+ topicIds,
Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _)
=> ())
@@ -1821,6 +1851,7 @@ class ReplicaManagerTest {
val tp1 = new TopicPartition(topic, 1)
val partition0Replicas = Seq[Integer](1, 0).asJava
val partition1Replicas = Seq[Integer](1, 0).asJava
+ val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic ->
Uuid.randomUuid()).asJava
val leaderAndIsrRequest1 = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
controllerId, 0, brokerEpoch,
@@ -1846,6 +1877,7 @@ class ReplicaManagerTest {
.setReplicas(partition1Replicas)
.setIsNew(true)
).asJava,
+ topicIds,
Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _)
=> ())
@@ -1876,6 +1908,7 @@ class ReplicaManagerTest {
.setReplicas(partition1Replicas)
.setIsNew(true)
).asJava,
+ topicIds,
Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _)
=> ())
@@ -1935,6 +1968,7 @@ class ReplicaManagerTest {
val becomeLeaderRequest = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 10,
brokerEpoch,
Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava,
+ Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
).build()
@@ -1961,6 +1995,7 @@ class ReplicaManagerTest {
val becomeLeaderRequest = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava,
+ Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
).build()
@@ -2110,6 +2145,7 @@ class ReplicaManagerTest {
val becomeLeaderRequest = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava,
+ Collections.singletonMap(tp0.topic(), Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
).build()
@@ -2176,4 +2212,100 @@ class ReplicaManagerTest {
replicaManager.shutdown(false)
}
}
+
+ @Test
+ def testPartitionMetadataFile() = {
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time))
+ try {
+ val brokerList = Seq[Integer](0, 1).asJava
+ val topicPartition = new TopicPartition(topic, 0)
+ replicaManager.createPartition(topicPartition)
+ .createLogIfNotExists(isNew = false, isFutureReplica = false,
+ new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+ val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
+
+ def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
+ Seq(new LeaderAndIsrPartitionState()
+ .setTopicName(topic)
+ .setPartitionIndex(0)
+ .setControllerEpoch(0)
+ .setLeader(0)
+ .setLeaderEpoch(epoch)
+ .setIsr(brokerList)
+ .setZkVersion(0)
+ .setReplicas(brokerList)
+ .setIsNew(true)).asJava,
+ topicIds,
+ Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+ replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _)
=> ())
+ assertFalse(replicaManager.localLog(topicPartition).isEmpty)
+ val id = topicIds.get(topicPartition.topic())
+ val log = replicaManager.localLog(topicPartition).get
+ assertFalse(log.partitionMetadataFile.isEmpty)
+ assertFalse(log.partitionMetadataFile.get.isEmpty())
+ val partitionMetadata = log.partitionMetadataFile.get.read()
+
+ // Current version of PartitionMetadataFile is 0.
+ assertEquals(0, partitionMetadata.version)
+ assertEquals(id, partitionMetadata.topicId)
+ } finally replicaManager.shutdown(checkpointHW = false)
+ }
+
+ @Test
+ def testPartitionMetadataFileNotCreated() = {
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time))
+ try {
+ val brokerList = Seq[Integer](0, 1).asJava
+ val topicPartition = new TopicPartition(topic, 0)
+ val topicPartitionFoo = new TopicPartition("foo", 0)
+ replicaManager.createPartition(topicPartition)
+ .createLogIfNotExists(isNew = false, isFutureReplica = false,
+ new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+ val topicIds = Map(topic -> Uuid.ZERO_UUID, "foo" ->
Uuid.randomUuid()).asJava
+
+ def leaderAndIsrRequest(epoch: Int, name: String, version: Short):
LeaderAndIsrRequest = LeaderAndIsrRequest.parse(
+ new LeaderAndIsrRequest.Builder(version, 0, 0, brokerEpoch,
+ Seq(new LeaderAndIsrPartitionState()
+ .setTopicName(name)
+ .setPartitionIndex(0)
+ .setControllerEpoch(0)
+ .setLeader(0)
+ .setLeaderEpoch(epoch)
+ .setIsr(brokerList)
+ .setZkVersion(0)
+ .setReplicas(brokerList)
+ .setIsNew(true)).asJava,
+ topicIds,
+ Set(new Node(0, "host1", 0), new Node(1, "host2",
1)).asJava).build().serialize(), version)
+
+ // The file has no contents if the topic does not have an associated
topic ID.
+ replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0,
"fakeTopic", ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
+ assertFalse(replicaManager.localLog(topicPartition).isEmpty)
+ val log = replicaManager.localLog(topicPartition).get
+ assertFalse(log.partitionMetadataFile.isEmpty)
+ assertTrue(log.partitionMetadataFile.get.isEmpty())
+
+ // The file has no contents if the topic has the default UUID.
+ replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topic,
ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
+ assertFalse(replicaManager.localLog(topicPartition).isEmpty)
+ val log2 = replicaManager.localLog(topicPartition).get
+ assertFalse(log2.partitionMetadataFile.isEmpty)
+ assertTrue(log2.partitionMetadataFile.get.isEmpty())
+
+ // The file has no contents if the request is an older version
+ replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo",
0), (_, _) => ())
+ assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty)
+ val log3 = replicaManager.localLog(topicPartitionFoo).get
+ assertFalse(log3.partitionMetadataFile.isEmpty)
+ assertTrue(log3.partitionMetadataFile.get.isEmpty())
+
+ // The file has no contents if the request is an older version
+ replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo",
4), (_, _) => ())
+ assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty)
+ val log4 = replicaManager.localLog(topicPartitionFoo).get
+ assertFalse(log4.partitionMetadataFile.isEmpty)
+ assertTrue(log4.partitionMetadataFile.get.isEmpty())
+ } finally replicaManager.shutdown(checkpointHW = false)
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 9ea4039..ed480c5 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -62,6 +62,7 @@ class RequestQuotaTest extends BaseRequestTest {
private val topic = "topic-1"
private val numPartitions = 1
private val tp = new TopicPartition(topic, 0)
+ private val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
private val logDir = "logDir"
private val unthrottledClientId = "unthrottled-client"
private val smallQuotaProducerClientId = "small-quota-producer-client"
@@ -254,6 +255,7 @@ class RequestQuotaTest extends BaseRequestTest {
.setZkVersion(2)
.setReplicas(Seq(brokerId).asJava)
.setIsNew(true)).asJava,
+ topicIds,
Set(new Node(brokerId, "localhost", 0)).asJava)
case ApiKeys.STOP_REPLICA =>
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 436dc9e..ce2ceda 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -21,6 +21,7 @@ import kafka.utils.{CoreUtils, TestUtils}
import kafka.utils.TestUtils._
import java.io.{DataInputStream, File}
import java.net.ServerSocket
+import java.util.Collections
import java.util.concurrent.{Executors, TimeUnit}
import kafka.cluster.Broker
@@ -29,6 +30,7 @@ import kafka.log.LogManager
import kafka.zookeeper.ZooKeeperClientTimeoutException
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
@@ -233,7 +235,8 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
// Initiate a sendRequest and wait until connection is established and
one byte is received by the peer
val requestBuilder = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
- controllerId, 1, 0L, Seq.empty.asJava,
brokerAndEpochs.keys.map(_.node(listenerName)).toSet.asJava)
+ controllerId, 1, 0L, Seq.empty.asJava, Collections.singletonMap(topic,
Uuid.randomUuid()),
+ brokerAndEpochs.keys.map(_.node(listenerName)).toSet.asJava)
controllerChannelManager.sendRequest(1, requestBuilder)
receiveFuture.get(10, TimeUnit.SECONDS)
diff --git
a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index 97a27d9..1d51585 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -1891,7 +1891,6 @@ public final class MessageDataGenerator implements
MessageClassGenerator {
prefix, field.camelCaseName(), field.camelCaseName());
} else if (field.type().isStruct() ||
field.type() instanceof FieldType.UUIDFieldType) {
- } else if (field.type().isStruct()) {
buffer.printf("+ \"%s%s=\" + %s.toString()%n",
prefix, field.camelCaseName(), field.camelCaseName());
} else if (field.type().isArray()) {