This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2e2b0a58eda KAFKA-17914: Update string ref with SharePartitionKey.
(#17660)
2e2b0a58eda is described below
commit 2e2b0a58eda3e677763af974a44a6aaa3c280214
Author: Sushant Mahajan <[email protected]>
AuthorDate: Fri Nov 8 15:05:39 2024 +0530
KAFKA-17914: Update string ref with SharePartitionKey. (#17660)
Currently, we are using the String repr of the
shareCoordinator/sharePartition key (groupId:topicId:parition) as defined in
kip-932 in a few methods like ShareCoordinator.partitionFor and
ShareCoordinatorMetadataCacheHelper.getShareCoordinator.
This has the potential to introduce subtle bugs when incorrect strings are
used to invoke these methods. What is perturbing is that the failures might be
intermittent.
This PR aims to remedy the situation by changing the type to the concrete
SharePartitionKey. This way callers need not be worried about a specific
encoding or format of the coordinator key as long as the SharePartitionKey has
the correct fields set.
There is one issue - the FIND_COORDINATOR RPC does require the coordinator
key to be set as a String in the request body. We can't get around this and
have to set the value as String. However, on the KafkaApis handler side we
parse this key into a SharePartitionKey again and gain compile time safety. One
downside is that we need to split and format the incoming coordinator key in
the request but that can be encapsulated at a single location in
SharePartitionKey.
Added tests for partitionFor.
Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
<[email protected]>, Manikumar Reddy <[email protected]>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 12 +++-
.../ShareCoordinatorMetadataCacheHelperImpl.java | 7 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 11 ++-
.../kafka/coordinator/share/ShareCoordinator.java | 5 +-
.../coordinator/share/ShareCoordinatorService.java | 9 ++-
.../share/ShareCoordinatorServiceTest.java | 37 ++++++++++
.../kafka/server/share/SharePartitionKey.java | 52 ++++++++++++++-
.../share/persister/PersisterStateManager.java | 78 ++++++++++------------
.../ShareCoordinatorMetadataCacheHelper.java | 3 +-
.../share/persister/DefaultStatePersisterTest.java | 2 +-
.../share/persister/PersisterStateManagerTest.java | 10 +--
11 files changed, 162 insertions(+), 64 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0cba5cf4e41..21942229d87 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -76,7 +76,7 @@ import
org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_I
import org.apache.kafka.server.purgatory.TopicPartitionOperationKey
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.share.context.ShareFetchContext
-import org.apache.kafka.server.share.ErroneousAndValidPartitionData
+import org.apache.kafka.server.share.{ErroneousAndValidPartitionData,
SharePartitionKey}
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
import org.apache.kafka.storage.internals.log.AppendOrigin
@@ -1645,6 +1645,13 @@ class KafkaApis(val requestChannel: RequestChannel,
if (shareCoordinator.isEmpty) {
return (Errors.INVALID_REQUEST, Node.noNode)
}
+ try {
+ SharePartitionKey.validate(key)
+ } catch {
+ case e: IllegalArgumentException =>
+ error(s"Share coordinator key is invalid", e)
+ (Errors.INVALID_REQUEST, Node.noNode())
+ }
}
val (partition, internalTopicName) = CoordinatorType.forId(keyType)
match {
case CoordinatorType.GROUP =>
@@ -1654,8 +1661,7 @@ class KafkaApis(val requestChannel: RequestChannel,
(txnCoordinator.partitionFor(key), TRANSACTION_STATE_TOPIC_NAME)
case CoordinatorType.SHARE =>
- // None check already done above
- (shareCoordinator.get.partitionFor(key),
SHARE_GROUP_STATE_TOPIC_NAME)
+ (shareCoordinator.foreach(coordinator =>
coordinator.partitionFor(SharePartitionKey.getInstance(key))),
SHARE_GROUP_STATE_TOPIC_NAME)
}
val topicMetadata =
metadataCache.getTopicMetadata(Set(internalTopicName),
request.context.listenerName)
diff --git
a/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java
b/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java
index caa7b348f5e..28148eab7ff 100644
---
a/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java
+++
b/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.server.share.SharePartitionKey;
import
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
import java.util.HashSet;
@@ -38,12 +39,12 @@ import scala.jdk.javaapi.OptionConverters;
public class ShareCoordinatorMetadataCacheHelperImpl implements
ShareCoordinatorMetadataCacheHelper {
private final MetadataCache metadataCache;
- private final Function<String, Integer> keyToPartitionMapper;
+ private final Function<SharePartitionKey, Integer> keyToPartitionMapper;
private final ListenerName interBrokerListenerName;
public ShareCoordinatorMetadataCacheHelperImpl(
MetadataCache metadataCache,
- Function<String, Integer> keyToPartitionMapper,
+ Function<SharePartitionKey, Integer> keyToPartitionMapper,
ListenerName interBrokerListenerName
) {
Objects.requireNonNull(metadataCache, "metadataCache must not be
null");
@@ -61,7 +62,7 @@ public class ShareCoordinatorMetadataCacheHelperImpl
implements ShareCoordinator
}
@Override
- public Node getShareCoordinator(String key, String internalTopicName) {
+ public Node getShareCoordinator(SharePartitionKey key, String
internalTopicName) {
if (metadataCache.contains(internalTopicName)) {
Set<String> topicSet = new HashSet<>();
topicSet.add(internalTopicName);
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 31fdc9efe81..7b78793373b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1340,6 +1340,10 @@ class KafkaApisTest extends Logging {
topicConfigOverride.put(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG,
requestTimeout.toString)
val groupId = "group"
+ val topicId = Uuid.randomUuid
+ val partition = 0
+ var key:String = groupId
+
val topicName =
coordinatorType match {
case CoordinatorType.GROUP =>
@@ -1359,6 +1363,7 @@ class KafkaApisTest extends Logging {
case CoordinatorType.SHARE =>
authorizeResource(authorizer, AclOperation.CLUSTER_ACTION,
ResourceType.CLUSTER,
Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+ key = "%s:%s:%d" format(groupId, topicId, partition)
Topic.SHARE_GROUP_STATE_TOPIC_NAME
case _ =>
throw new IllegalStateException(s"Unknown coordinator type
$coordinatorType")
@@ -1368,12 +1373,12 @@ class KafkaApisTest extends Logging {
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(coordinatorType.id())
- .setCoordinatorKeys(asList(groupId)))
+ .setCoordinatorKeys(asList(key)))
} else {
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(coordinatorType.id())
- .setKey(groupId))
+ .setKey(key))
}
val request =
buildRequest(findCoordinatorRequestBuilder.build(requestHeader.apiVersion))
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
@@ -1389,7 +1394,7 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.INVALID_REQUEST.code,
response.data.coordinators.get(0).errorCode)
} else if (version >= 4) {
assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code,
response.data.coordinators.get(0).errorCode)
- assertEquals(groupId, response.data.coordinators.get(0).key)
+ assertEquals(key, response.data.coordinators.get(0).key)
} else {
assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code,
response.data.errorCode)
assertTrue(capturedRequest.getValue.isEmpty)
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
index 67f9d606414..dd56503dbaf 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
@@ -24,6 +24,7 @@ import
org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.share.SharePartitionKey;
import java.util.OptionalInt;
import java.util.Properties;
@@ -39,10 +40,10 @@ public interface ShareCoordinator {
/**
* Return the partition index for the given key.
*
- * @param key - groupId:topicId:partitionId.
+ * @param key - reference to {@link SharePartitionKey}.
* @return The partition index.
*/
- int partitionFor(String key);
+ int partitionFor(SharePartitionKey key);
/**
* Return the configuration properties of the share-group state topic.
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index ecec6897dac..05999bd6d25 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -201,9 +201,12 @@ public class ShareCoordinatorService implements
ShareCoordinator {
}
@Override
- public int partitionFor(String key) {
+ public int partitionFor(SharePartitionKey key) {
throwIfNotActive();
- return Utils.abs(key.hashCode()) % numPartitions;
+ // Call to asCoordinatorKey is necessary as we depend only on topicId
(Uuid) and
+ // not topic name. We do not want this calculation to distinguish
between 2
+ // SharePartitionKeys where everything except topic name is the same.
+ return Utils.abs(key.asCoordinatorKey().hashCode()) % numPartitions;
}
@Override
@@ -539,7 +542,7 @@ public class ShareCoordinatorService implements
ShareCoordinator {
}
TopicPartition topicPartitionFor(SharePartitionKey key) {
- return new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME,
partitionFor(key.asCoordinatorKey()));
+ return new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME,
partitionFor(key));
}
private static <P> boolean isEmpty(List<P> list) {
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index a88aba632a7..c641447dd6b 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
@@ -31,6 +32,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
@@ -52,6 +54,7 @@ import java.util.concurrent.TimeoutException;
import static
org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -637,4 +640,38 @@ class ShareCoordinatorServiceTest {
assertEquals(Topic.SHARE_GROUP_STATE_TOPIC_NAME, tp.topic());
assertEquals(expectedPartition, tp.partition());
}
+
+ @Test
+ public void testPartitionFor() {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ ShareCoordinatorService service = new ShareCoordinatorService(
+ new LogContext(),
+
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ Time.SYSTEM
+ );
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 0;
+
+ // inactive shard should throw exception
+ assertThrows(CoordinatorNotAvailableException.class, () ->
service.partitionFor(SharePartitionKey.getInstance(groupId, topicId,
partition)));
+
+ final int numPartitions = 50;
+ service.startup(() -> numPartitions);
+
+ final SharePartitionKey key1 = SharePartitionKey.getInstance(groupId,
new TopicIdPartition(topicId, partition, null));
+ int sharePartitionKey = service.partitionFor(key1);
+ assertEquals(Utils.abs(key1.asCoordinatorKey().hashCode()) %
numPartitions, sharePartitionKey);
+
+ // The presence of a topic name should not affect the choice of
partition
+ final SharePartitionKey key2 = new SharePartitionKey(groupId, new
TopicIdPartition(topicId, partition, "whatever"));
+ sharePartitionKey = service.partitionFor(key2);
+ assertEquals(Utils.abs(key2.asCoordinatorKey().hashCode()) %
numPartitions, sharePartitionKey);
+
+ // asCoordinatorKey does not discriminate on topic name
+ assertEquals(key1.asCoordinatorKey(), key2.asCoordinatorKey());
+ }
}
diff --git
a/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
b/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
index 29ce801aff3..89f555b564e 100644
--- a/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
+++ b/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
@@ -65,6 +65,56 @@ public class SharePartitionKey {
return getInstance(groupId, topicIdPartition.topicId(),
topicIdPartition.partition());
}
+
+ /**
+ * Returns a SharePartitionKey from input string of format -
groupId:topicId:partition
+ * @param key - String in format groupId:topicId:partition
+ * @return object representing SharePartitionKey
+ * @throws IllegalArgumentException if the key is empty or has invalid
format
+ */
+ public static SharePartitionKey getInstance(String key) {
+ validate(key);
+ String[] tokens = key.split(":");
+ return new SharePartitionKey(
+ tokens[0].trim(),
+ Uuid.fromString(tokens[1]),
+ Integer.parseInt(tokens[2])
+ );
+ }
+
+ /**
+ * Validates whether the String argument has a valid SharePartitionKey
format - groupId:topicId:partition
+ * @param key - String in format groupId:topicId:partition
+ * @throws IllegalArgumentException if the key is empty or has invalid
format
+ */
+ public static void validate(String key) {
+ Objects.requireNonNull(key, "Share partition key cannot be null");
+ if (key.isEmpty()) {
+ throw new IllegalArgumentException("Share partition key cannot be
empty");
+ }
+
+ String[] tokens = key.split(":");
+ if (tokens.length != 3) {
+ throw new IllegalArgumentException("Invalid key format: expected -
groupId:topicId:partition, found - " + key);
+ }
+
+ if (tokens[0].trim().isEmpty()) {
+ throw new IllegalArgumentException("GroupId must be alphanumeric
string");
+ }
+
+ try {
+ Uuid.fromString(tokens[1]);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid topic ID: " +
tokens[1], e);
+ }
+
+ try {
+ Integer.parseInt(tokens[2]);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid partition: " +
tokens[2], e);
+ }
+ }
+
public static SharePartitionKey getInstance(String groupId, Uuid topicId,
int partition) {
return new SharePartitionKey(groupId, topicId, partition);
}
@@ -97,7 +147,7 @@ public class SharePartitionKey {
@Override
public String toString() {
return "SharePartitionKey{" +
- "groupId='" + groupId +
+ "groupId=" + groupId +
", topicIdPartition=" + topicIdPartition +
'}';
}
diff --git
a/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
b/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 45bbf52a39d..ea14c4f59b4 100644
---
a/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++
b/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -206,12 +206,10 @@ public class PersisterStateManager {
*/
public abstract class PersisterStateManagerHandler implements
RequestCompletionHandler {
protected Node coordinatorNode;
- protected final String groupId;
- protected final Uuid topicId;
- protected final int partition;
private final BackoffManager findCoordBackoff;
protected final Logger log = LoggerFactory.getLogger(getClass());
private Consumer<ClientResponse> onCompleteCallback;
+ protected final SharePartitionKey partitionKey;
public PersisterStateManagerHandler(
String groupId,
@@ -221,12 +219,10 @@ public class PersisterStateManager {
long backoffMaxMs,
int maxRPCRetryAttempts
) {
- this.groupId = groupId;
- this.topicId = topicId;
- this.partition = partition;
this.findCoordBackoff = new BackoffManager(maxRPCRetryAttempts,
backoffMs, backoffMaxMs);
this.onCompleteCallback = response -> {
}; // noop
+ partitionKey = SharePartitionKey.getInstance(groupId, topicId,
partition);
}
/**
@@ -290,7 +286,7 @@ public class PersisterStateManager {
protected AbstractRequest.Builder<FindCoordinatorRequest>
findShareCoordinatorBuilder() {
return new FindCoordinatorRequest.Builder(new
FindCoordinatorRequestData()
.setKeyType(FindCoordinatorRequest.CoordinatorType.SHARE.id())
- .setKey(coordinatorKey()));
+ .setKey(partitionKey().asCoordinatorKey()));
}
public void addRequestToNodeMap(Node node,
PersisterStateManagerHandler handler) {
@@ -300,7 +296,7 @@ public class PersisterStateManager {
synchronized (nodeMapLock) {
nodeRPCMap.computeIfAbsent(node, k -> new HashMap<>())
.computeIfAbsent(handler.rpcType(), k -> new HashMap<>())
- .computeIfAbsent(handler.groupId, k -> new LinkedList<>())
+ .computeIfAbsent(partitionKey().groupId(), k -> new
LinkedList<>())
.add(handler);
}
sender.wakeup();
@@ -317,7 +313,7 @@ public class PersisterStateManager {
}
if (cacheHelper.containsTopic(Topic.SHARE_GROUP_STATE_TOPIC_NAME))
{
log.debug("{} internal topic already exists.",
Topic.SHARE_GROUP_STATE_TOPIC_NAME);
- Node node = cacheHelper.getShareCoordinator(coordinatorKey(),
Topic.SHARE_GROUP_STATE_TOPIC_NAME);
+ Node node = cacheHelper.getShareCoordinator(partitionKey(),
Topic.SHARE_GROUP_STATE_TOPIC_NAME);
if (node != Node.noNode()) {
log.debug("Found coordinator node in cache: {}", node);
coordinatorNode = node;
@@ -333,8 +329,8 @@ public class PersisterStateManager {
*
* @return String
*/
- protected String coordinatorKey() {
- return SharePartitionKey.asCoordinatorKey(groupId, topicId,
partition);
+ protected SharePartitionKey partitionKey() {
+ return partitionKey;
}
/**
@@ -378,7 +374,7 @@ public class PersisterStateManager {
findCoordBackoff.incrementAttempt();
List<FindCoordinatorResponseData.Coordinator> coordinators =
((FindCoordinatorResponse) response.responseBody()).coordinators();
if (coordinators.size() != 1) {
- log.error("Find coordinator response for {} is invalid",
coordinatorKey());
+ log.error("Find coordinator response for {} is invalid",
partitionKey());
findCoordinatorErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new
IllegalStateException("Invalid response with multiple coordinators."));
return;
}
@@ -399,12 +395,12 @@ public class PersisterStateManager {
}
break;
- case COORDINATOR_NOT_AVAILABLE: // retryable error codes
+ case COORDINATOR_NOT_AVAILABLE: // retriable error codes
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
- log.warn("Received retryable error in find coordinator:
{}", error.message());
+ log.warn("Received retriable error in find coordinator for
{} using key {}: {}", name(), partitionKey(), error.message());
if (!findCoordBackoff.canAttempt()) {
- log.error("Exhausted max retries to find coordinator
without success.");
+ log.error("Exhausted max retries to find coordinator
for {} using key {} without success.", name(), partitionKey());
findCoordinatorErrorResponse(error, new
Exception("Exhausted max retries to find coordinator without success."));
break;
}
@@ -413,7 +409,7 @@ public class PersisterStateManager {
break;
default:
- log.error("Unable to find coordinator.");
+ log.error("Unable to find coordinator for {} using key
{}.", name(), partitionKey());
findCoordinatorErrorResponse(error, null);
}
}
@@ -517,9 +513,9 @@ public class PersisterStateManager {
WriteShareGroupStateResponse combinedResponse =
(WriteShareGroupStateResponse) response.responseBody();
for (WriteShareGroupStateResponseData.WriteStateResult
writeStateResult : combinedResponse.data().results()) {
- if (writeStateResult.topicId().equals(topicId)) {
+ if
(writeStateResult.topicId().equals(partitionKey().topicId())) {
Optional<WriteShareGroupStateResponseData.PartitionResult>
partitionStateData =
-
writeStateResult.partitions().stream().filter(partitionResult ->
partitionResult.partition() == partition)
+
writeStateResult.partitions().stream().filter(partitionResult ->
partitionResult.partition() == partitionKey().partition())
.findFirst();
if (partitionStateData.isPresent()) {
@@ -528,20 +524,20 @@ public class PersisterStateManager {
case NONE:
writeStateBackoff.resetAttempts();
WriteShareGroupStateResponseData.WriteStateResult result =
WriteShareGroupStateResponse.toResponseWriteStateResult(
- topicId,
+ partitionKey().topicId(),
Collections.singletonList(partitionStateData.get())
);
this.result.complete(new
WriteShareGroupStateResponse(
new
WriteShareGroupStateResponseData().setResults(Collections.singletonList(result))));
return;
- // check retryable errors
+ // check retriable errors
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
- log.warn("Received retryable error in write
state RPC: {}", error.message());
+ log.warn("Received retriable error in write
state RPC for key {}: {}", partitionKey(), error.message());
if (!writeStateBackoff.canAttempt()) {
- log.error("Exhausted max retries for write
state RPC without success.");
+ log.error("Exhausted max retries for write
state RPC for key {} without success.", partitionKey());
writeStateErrorResponse(error, new
Exception("Exhausted max retries to complete write state RPC without
success."));
return;
}
@@ -550,7 +546,7 @@ public class PersisterStateManager {
return;
default:
- log.error("Unable to perform write state RPC:
{}", error.message());
+ log.error("Unable to perform write state RPC
for key {}: {}", partitionKey(), error.message());
writeStateErrorResponse(error, null);
return;
}
@@ -560,21 +556,21 @@ public class PersisterStateManager {
// no response found specific topic partition
IllegalStateException exception = new IllegalStateException(
- "Failed to write state for partition " + partition + " in
topic " + topicId + " for group " + groupId
+ "Failed to write state for share partition: " + partitionKey()
);
writeStateErrorResponse(Errors.forException(exception), exception);
}
private void writeStateErrorResponse(Errors error, Exception
exception) {
this.result.complete(new WriteShareGroupStateResponse(
- WriteShareGroupStateResponse.toErrorResponseData(topicId,
partition, error, "Error in write state RPC. " +
+
WriteShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(),
partitionKey().partition(), error, "Error in write state RPC. " +
(exception == null ? error.message() :
exception.getMessage()))));
}
@Override
protected void findCoordinatorErrorResponse(Errors error, Exception
exception) {
this.result.complete(new WriteShareGroupStateResponse(
- WriteShareGroupStateResponse.toErrorResponseData(topicId,
partition, error, "Error in find coordinator. " +
+
WriteShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(),
partitionKey().partition(), error, "Error in find coordinator. " +
(exception == null ? error.message() :
exception.getMessage()))));
}
@@ -595,7 +591,6 @@ public class PersisterStateManager {
public class ReadStateHandler extends PersisterStateManagerHandler {
private final int leaderEpoch;
- private final String coordinatorKey;
private final CompletableFuture<ReadShareGroupStateResponse> result;
private final BackoffManager readStateBackoff;
@@ -612,7 +607,6 @@ public class PersisterStateManager {
) {
super(groupId, topicId, partition, backoffMs, backoffMaxMs,
maxRPCRetryAttempts);
this.leaderEpoch = leaderEpoch;
- this.coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId,
topicId, partition);
this.result = result;
this.readStateBackoff = new BackoffManager(maxRPCRetryAttempts,
backoffMs, backoffMaxMs);
}
@@ -660,9 +654,9 @@ public class PersisterStateManager {
ReadShareGroupStateResponse combinedResponse =
(ReadShareGroupStateResponse) response.responseBody();
for (ReadShareGroupStateResponseData.ReadStateResult
readStateResult : combinedResponse.data().results()) {
- if (readStateResult.topicId().equals(topicId)) {
+ if
(readStateResult.topicId().equals(partitionKey().topicId())) {
Optional<ReadShareGroupStateResponseData.PartitionResult>
partitionStateData =
-
readStateResult.partitions().stream().filter(partitionResult ->
partitionResult.partition() == partition)
+
readStateResult.partitions().stream().filter(partitionResult ->
partitionResult.partition() == partitionKey().partition())
.findFirst();
if (partitionStateData.isPresent()) {
@@ -671,20 +665,20 @@ public class PersisterStateManager {
case NONE:
readStateBackoff.resetAttempts();
ReadShareGroupStateResponseData.ReadStateResult result =
ReadShareGroupStateResponse.toResponseReadStateResult(
- topicId,
+ partitionKey().topicId(),
Collections.singletonList(partitionStateData.get())
);
this.result.complete(new
ReadShareGroupStateResponse(new ReadShareGroupStateResponseData()
.setResults(Collections.singletonList(result))));
return;
- // check retryable errors
+ // check retriable errors
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
- log.warn("Received retryable error in read
state RPC: {}", error.message());
+ log.warn("Received retriable error in read
state RPC for key {}: {}", partitionKey(), error.message());
if (!readStateBackoff.canAttempt()) {
- log.error("Exhausted max retries for read
state RPC without success.");
+ log.error("Exhausted max retries for read
state RPC for key {} without success.", partitionKey());
readStateErrorReponse(error, new
Exception("Exhausted max retries to complete read state RPC without success."));
return;
}
@@ -693,7 +687,7 @@ public class PersisterStateManager {
return;
default:
- log.error("Unable to perform read state RPC:
{}", error.message());
+ log.error("Unable to perform read state RPC
for key {}: {}", partitionKey(), error.message());
readStateErrorReponse(error, null);
return;
}
@@ -703,21 +697,21 @@ public class PersisterStateManager {
// no response found specific topic partition
IllegalStateException exception = new IllegalStateException(
- "Failed to read state for partition " + partition + " in topic
" + topicId + " for group " + groupId
+ "Failed to read state for share partition " + partitionKey()
);
readStateErrorReponse(Errors.forException(exception), exception);
}
protected void readStateErrorReponse(Errors error, Exception
exception) {
this.result.complete(new ReadShareGroupStateResponse(
- ReadShareGroupStateResponse.toErrorResponseData(topicId,
partition, error, "Error in find coordinator. " +
+
ReadShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(),
partitionKey().partition(), error, "Error in find coordinator. " +
(exception == null ? error.message() :
exception.getMessage()))));
}
@Override
protected void findCoordinatorErrorResponse(Errors error, Exception
exception) {
this.result.complete(new ReadShareGroupStateResponse(
- ReadShareGroupStateResponse.toErrorResponseData(topicId,
partition, error, "Error in read state RPC. " +
+
ReadShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(),
partitionKey().partition(), error, "Error in read state RPC. " +
(exception == null ? error.message() :
exception.getMessage()))));
}
@@ -928,10 +922,10 @@ public class PersisterStateManager {
handlers.forEach(persHandler -> {
assert persHandler instanceof WriteStateHandler;
WriteStateHandler handler = (WriteStateHandler) persHandler;
- partitionData.computeIfAbsent(handler.topicId, topicId -> new
LinkedList<>())
+
partitionData.computeIfAbsent(handler.partitionKey().topicId(), topicId -> new
LinkedList<>())
.add(
new WriteShareGroupStateRequestData.PartitionData()
- .setPartition(handler.partition)
+ .setPartition(handler.partitionKey().partition())
.setStateEpoch(handler.stateEpoch)
.setLeaderEpoch(handler.leaderEpoch)
.setStartOffset(handler.startOffset)
@@ -959,10 +953,10 @@ public class PersisterStateManager {
handlers.forEach(persHandler -> {
assert persHandler instanceof ReadStateHandler;
ReadStateHandler handler = (ReadStateHandler) persHandler;
- partitionData.computeIfAbsent(handler.topicId, topicId -> new
LinkedList<>())
+
partitionData.computeIfAbsent(handler.partitionKey().topicId(), topicId -> new
LinkedList<>())
.add(
new ReadShareGroupStateRequestData.PartitionData()
- .setPartition(handler.partition)
+ .setPartition(handler.partitionKey().partition())
.setLeaderEpoch(handler.leaderEpoch)
);
});
diff --git
a/share/src/main/java/org/apache/kafka/server/share/persister/ShareCoordinatorMetadataCacheHelper.java
b/share/src/main/java/org/apache/kafka/server/share/persister/ShareCoordinatorMetadataCacheHelper.java
index 75e9cd62960..5a18a370c2e 100644
---
a/share/src/main/java/org/apache/kafka/server/share/persister/ShareCoordinatorMetadataCacheHelper.java
+++
b/share/src/main/java/org/apache/kafka/server/share/persister/ShareCoordinatorMetadataCacheHelper.java
@@ -18,13 +18,14 @@
package org.apache.kafka.server.share.persister;
import org.apache.kafka.common.Node;
+import org.apache.kafka.server.share.SharePartitionKey;
import java.util.List;
public interface ShareCoordinatorMetadataCacheHelper {
boolean containsTopic(String topic);
- Node getShareCoordinator(String key, String internalTopicName);
+ Node getShareCoordinator(SharePartitionKey key, String internalTopicName);
List<Node> getClusterNodes();
}
diff --git
a/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
b/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
index f3c285cd5ca..f2cca9c4b3f 100644
---
a/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
+++
b/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
@@ -107,7 +107,7 @@ class DefaultStatePersisterTest {
}
@Override
- public Node getShareCoordinator(String key, String
internalTopicName) {
+ public Node getShareCoordinator(SharePartitionKey key, String
internalTopicName) {
return Node.noNode();
}
diff --git
a/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
b/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
index 464dd4c0517..e410c2e3588 100644
---
a/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
+++
b/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
@@ -142,7 +142,7 @@ class PersisterStateManagerTest {
this.result.complete(new TestHandlerResponse(new
TestHandlerResponseData()
.setResults(Collections.singletonList(new
WriteShareGroupStateResponseData.WriteStateResult()
.setPartitions(Collections.singletonList(new
WriteShareGroupStateResponseData.PartitionResult()
- .setPartition(partition)
+ .setPartition(partitionKey().partition())
.setErrorMessage(Errors.NONE.message())
.setErrorCode(Errors.NONE.code()))
)
@@ -159,9 +159,9 @@ class PersisterStateManagerTest {
protected void findCoordinatorErrorResponse(Errors error, Exception
exception) {
this.result.complete(new TestHandlerResponse(new
TestHandlerResponseData()
.setResults(Collections.singletonList(new
WriteShareGroupStateResponseData.WriteStateResult()
- .setTopicId(topicId)
+ .setTopicId(partitionKey().topicId())
.setPartitions(Collections.singletonList(new
WriteShareGroupStateResponseData.PartitionResult()
- .setPartition(partition)
+ .setPartition(partitionKey().partition())
.setErrorMessage(exception == null ? error.message() :
exception.getMessage())
.setErrorCode(error.code()))
)
@@ -198,7 +198,7 @@ class PersisterStateManagerTest {
}
@Override
- public Node getShareCoordinator(String key, String
internalTopicName) {
+ public Node getShareCoordinator(SharePartitionKey key, String
internalTopicName) {
return Node.noNode();
}
@@ -217,7 +217,7 @@ class PersisterStateManagerTest {
}
@Override
- public Node getShareCoordinator(String key, String
internalTopicName) {
+ public Node getShareCoordinator(SharePartitionKey key, String
internalTopicName) {
return coordinatorNode;
}