junrao commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1868206289
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -788,7 +814,9 @@ class ReplicaManager(val config: KafkaConfig,
* @param requiredAcks number of replicas who must
acknowledge the append before sending the response
* @param internalTopicsAllowed boolean indicating whether internal
topics can be appended to
* @param origin source of the append request (ie,
client, replication, coordinator)
- * @param entriesPerPartition the records per partition to be
appended
+ * @param entriesPerPartition the records per topic partition to
be appended.
+ * If topic partition contains
Uuid.ZERO_UUID or null as topicId the method
+ * will fall back to the old behaviour
and relay on topic name.
Review Comment:
relay => rely
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1447,37 +1475,36 @@ class ReplicaManager(val config: KafkaConfig,
if (traceEnabled)
trace(s"Append [$entriesPerPartition] to local log")
- entriesPerPartition.map { case (topicPartition, records) =>
-
brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
+ entriesPerPartition.map { case (topicIdPartition, records) =>
+
brokerTopicStats.topicStats(topicIdPartition.topic).totalProduceRequestRate.mark()
brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
// reject appending to internal topics if it is not allowed
- if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
- (new TopicOptionalIdPartition(Optional.empty(), topicPartition),
LogAppendResult(
+
Review Comment:
extra new line
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -788,7 +814,9 @@ class ReplicaManager(val config: KafkaConfig,
* @param requiredAcks number of replicas who must
acknowledge the append before sending the response
* @param internalTopicsAllowed boolean indicating whether internal
topics can be appended to
* @param origin source of the append request (ie,
client, replication, coordinator)
- * @param entriesPerPartition the records per partition to be
appended
+ * @param entriesPerPartition the records per topic partition to
be appended.
+ * If topic partition contains
Uuid.ZERO_UUID or null as topicId the method
Review Comment:
It seems that topicId can only be 0, but not null? If null is indeed
allowed, we need to check that in all places where we check for 0.
##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##########
@@ -136,11 +138,10 @@ class CoordinatorPartitionWriterTest {
VerificationGuard.SENTINEL,
batch
))
-
assertEquals(
batch,
- recordsCapture.getValue.getOrElse(tp,
- throw new AssertionError(s"No records for $tp"))
+ recordsCapture.getValue.find(_._1 == new TopicIdPartition(topicId,
tp)).getOrElse(
Review Comment:
Could we use case to avoid unnamed references?
##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -92,9 +92,12 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) =>
Map[ApiKeys, Nothing => Errors](
ApiKeys.METADATA -> ((resp: requests.MetadataResponse) =>
resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => {
+
+ val topicId = topicNames.find { case (topicId, topicName) => topicName
== topic}.map(_._1).getOrElse(Uuid.ZERO_UUID)
Review Comment:
It's probably clearer if we do
`val topicId = topicNames.find { case (_, topicName) => topicName ==
topic}.map{ case (topicId, _) => topicId).getOrElse(Uuid.ZERO_UUID)`
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -953,8 +980,8 @@ class ReplicaManager(val config: KafkaConfig,
}
private def buildProducePartitionStatus(
- results: Map[TopicPartition, LogAppendResult]
- ): Map[TopicPartition, ProducePartitionStatus] = {
+ results: Map[TopicIdPartition, LogAppendResult]
+ ): Map[TopicIdPartition, ProducePartitionStatus] = {
Review Comment:
Could we change topicPartition to topicIdPartition in the line below?
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1486,16 +1513,17 @@ class ReplicaManager(val config: KafkaConfig,
_: RecordTooLargeException |
_: RecordBatchTooLargeException |
_: CorruptRecordException |
- _: KafkaStorageException) =>
- (new TopicOptionalIdPartition(Optional.empty(), topicPartition),
LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e),
hasCustomErrorMessage = false))
+ _: KafkaStorageException |
+ _: UnknownTopicIdException) =>
Review Comment:
Could we add the new error code in ProduceResponse? Also, it seems that the
original KIP doesn't include this new error code for the producer. It would be
useful to update the KIP and the email thread.
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2636,20 +2636,22 @@ class KafkaApisTest extends Logging {
@Test
def shouldReplaceProducerFencedWithInvalidProducerEpochInProduceResponse():
Unit = {
val topic = "topic"
- addTopicToMetadataCache(topic, numPartitions = 2)
+ val topicId = Uuid.fromString("d2Gg8tgzJa2JYK2eTHUapg")
+ val tp = new TopicIdPartition(topicId, 0, "topic")
+ addTopicToMetadataCache(topic, numPartitions = 2, topicId = topicId)
for (version <- ApiKeys.PRODUCE.oldestVersion to
ApiKeys.PRODUCE.latestVersion) {
reset(replicaManager, clientQuotaManager, clientRequestQuotaManager,
requestChannel, txnCoordinator)
- val responseCallback: ArgumentCaptor[Map[TopicPartition,
PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
-
- val tp = new TopicPartition("topic", 0)
+ val responseCallback: ArgumentCaptor[Map[TopicIdPartition,
PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] =>
Unit])
val produceRequest = ProduceRequest.forCurrentMagic(new
ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
Collections.singletonList(new ProduceRequestData.TopicProduceData()
- .setName(tp.topic).setPartitionData(Collections.singletonList(
+ .setName(tp.topic)
Review Comment:
There is no need to set the topic name? Ditto below.
##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3143,7 +3147,7 @@ class ReplicaManagerTest {
requiredAcks = requiredAcks,
internalTopicsAllowed = false,
transactionalId = transactionalId,
- entriesPerPartition = entriesToAppend,
+ entriesPerPartition = entriesToAppend.map(e =>
replicaManager.topicIdPartition(e._1) -> e._2),
Review Comment:
Could we use case to avoid unnamed references?
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2478,7 +2493,8 @@ class KafkaApis(val requestChannel: RequestChannel,
} else {
// Otherwise, the regular appendRecords path is used for all the
non __consumer_offsets
// partitions or for all partitions when the new group coordinator
is disabled.
- controlRecords += partition ->
MemoryRecords.withEndTransactionMarker(
+ // If topicIdPartition contains Uuid.ZERO_UUid or null all
functionality will fall back on topic name.
Review Comment:
The topicId can't be null, right?
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -860,20 +886,21 @@ class ReplicaManager(val config: KafkaConfig,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
transactionalId: String,
- entriesPerPartition: Map[TopicPartition,
MemoryRecords],
- responseCallback: Map[TopicPartition,
PartitionResponse] => Unit,
- recordValidationStatsCallback: Map[TopicPartition,
RecordValidationStats] => Unit = _ => (),
+ entriesPerPartition: Map[TopicIdPartition,
MemoryRecords],
Review Comment:
It would be useful to document that unlike `appendRecords`, the topicIds in
entriesPerPartition are always present. I am still a bit concerned about this
discrepancy. It would be better if these two apis are consistent.
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2892,18 +2914,20 @@ class KafkaApisTest extends Logging {
val topic = "topic"
val transactionalId = "txn1"
- addTopicToMetadataCache(topic, numPartitions = 2)
+ val topicId = Uuid.fromString("d2Gg8tgzJa2JYK2eTHUapg")
+ val tp = new TopicIdPartition(topicId, 0, "topic")
+ addTopicToMetadataCache(topic, numPartitions = 2, topicId = tp.topicId())
for (version <- 3 to ApiKeys.PRODUCE.latestVersion) {
reset(replicaManager, clientQuotaManager, clientRequestQuotaManager,
requestChannel, txnCoordinator)
- val tp = new TopicPartition("topic", 0)
-
val produceRequest = ProduceRequest.forCurrentMagic(new
ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
Collections.singletonList(new ProduceRequestData.TopicProduceData()
- .setName(tp.topic).setPartitionData(Collections.singletonList(
+ .setName(tp.topic)
Review Comment:
Should we set either the topic name or the topic id as we did in
`testProduceResponseMetadataLookupErrorOnNotLeaderOrFollower`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]