This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6e380fbbcc8 KAFKA-19322 Remove the DelayedOperation constructor that
accepts an external lock (#19798)
6e380fbbcc8 is described below
commit 6e380fbbcc8fde22d1f2bb3310e1270d5b3f4837
Author: YuChia Ma <[email protected]>
AuthorDate: Tue May 27 01:05:41 2025 +0800
KAFKA-19322 Remove the DelayedOperation constructor that accepts an
external lock (#19798)
Remove the DelayedOperation constructor that accepts an external lock.
According to this [PR](https://github.com/apache/kafka/pull/19759).
Reviewers: Ken Huang <[email protected]>, PoAn Yang
<[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../src/main/java/kafka/server/share/DelayedShareFetch.java | 2 +-
core/src/main/scala/kafka/server/DelayedProduce.scala | 7 ++-----
core/src/main/scala/kafka/server/ReplicaManager.scala | 7 +------
.../coordinator/AbstractCoordinatorConcurrencyTest.scala | 4 +---
.../transaction/TransactionStateManagerTest.scala | 7 -------
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 3 ---
.../org/apache/kafka/server/purgatory/DelayedOperation.java | 13 +------------
7 files changed, 6 insertions(+), 37 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 8446b7acad6..727a88bc3fc 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -172,7 +172,7 @@ public class DelayedShareFetch extends DelayedOperation {
Uuid fetchId,
long remoteFetchMaxWaitMs
) {
- super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
+ super(shareFetch.fetchParams().maxWaitMs);
this.shareFetch = shareFetch;
this.replicaManager = replicaManager;
this.partitionsAcquired = new LinkedHashMap<>();
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala
b/core/src/main/scala/kafka/server/DelayedProduce.scala
index b60c79125b1..1d21ec78e4c 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -18,7 +18,6 @@
package kafka.server
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
-import java.util.concurrent.locks.Lock
import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.Meter
import kafka.utils.Logging
@@ -30,7 +29,6 @@ import org.apache.kafka.server.purgatory.DelayedOperation
import scala.collection._
import scala.jdk.CollectionConverters._
-import scala.jdk.OptionConverters.RichOption
case class ProducePartitionStatus(requiredOffset: Long, responseStatus:
PartitionResponse) {
@volatile var acksPending = false
@@ -59,9 +57,8 @@ object DelayedProduce {
class DelayedProduce(delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
- responseCallback: Map[TopicIdPartition,
PartitionResponse] => Unit,
- lockOpt: Option[Lock])
- extends DelayedOperation(delayMs, lockOpt.toJava) with Logging {
+ responseCallback: Map[TopicIdPartition,
PartitionResponse] => Unit)
+ extends DelayedOperation(delayMs) with Logging {
override lazy val logger: Logger = DelayedProduce.logger
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b0b2e72602a..ba34b2af132 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -71,7 +71,6 @@ import java.lang.{Long => JLong}
import java.nio.file.{Files, Paths}
import java.util
import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.Lock
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, Future,
RejectedExecutionException, TimeUnit}
import java.util.{Collections, Optional, OptionalInt, OptionalLong}
import java.util.function.Consumer
@@ -723,7 +722,6 @@ class ReplicaManager(val config: KafkaConfig,
* If topic partition contains
Uuid.ZERO_UUID as topicId the method
* will fall back to the old behaviour
and rely on topic name.
* @param responseCallback callback for sending the response
- * @param delayedProduceLock lock for the delayed actions
* @param recordValidationStatsCallback callback for updating stats on
record conversions
* @param requestLocal container for the stateful instances
scoped to this request -- this must correspond to the
* thread calling this method
@@ -735,7 +733,6 @@ class ReplicaManager(val config: KafkaConfig,
origin: AppendOrigin,
entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
responseCallback: Map[TopicIdPartition, PartitionResponse]
=> Unit,
- delayedProduceLock: Option[Lock] = None,
recordValidationStatsCallback: Map[TopicIdPartition,
RecordValidationStats] => Unit = _ => (),
requestLocal: RequestLocal = RequestLocal.noCaching,
verificationGuards: Map[TopicPartition, VerificationGuard]
= Map.empty): Unit = {
@@ -762,7 +759,6 @@ class ReplicaManager(val config: KafkaConfig,
maybeAddDelayedProduce(
requiredAcks,
- delayedProduceLock,
timeout,
entriesPerPartition,
localProduceResults,
@@ -967,7 +963,6 @@ class ReplicaManager(val config: KafkaConfig,
private def maybeAddDelayedProduce(
requiredAcks: Short,
- delayedProduceLock: Option[Lock],
timeoutMs: Long,
entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
initialAppendResults: Map[TopicIdPartition, LogAppendResult],
@@ -977,7 +972,7 @@ class ReplicaManager(val config: KafkaConfig,
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition,
initialAppendResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
- val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata,
this, responseCallback, delayedProduceLock)
+ val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata,
this, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this
delayed produce operation
val producerRequestKeys = entriesPerPartition.keys.map(new
TopicPartitionOperationKey(_)).toList
diff --git
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 56ccb822947..2e9f95beb51 100644
---
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -20,7 +20,6 @@ package kafka.coordinator
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Executors}
import java.util.{Collections, Random}
import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.locks.Lock
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.cluster.Partition
import kafka.log.LogManager
@@ -216,7 +215,6 @@ object AbstractCoordinatorConcurrencyTest {
origin: AppendOrigin,
entriesPerPartition: Map[TopicIdPartition,
MemoryRecords],
responseCallback: Map[TopicIdPartition,
PartitionResponse] => Unit,
- delayedProduceLock: Option[Lock] = None,
processingStatsCallback: Map[TopicIdPartition,
RecordValidationStats] => Unit = _ => (),
requestLocal: RequestLocal =
RequestLocal.noCaching,
verificationGuards: Map[TopicPartition,
VerificationGuard] = Map.empty): Unit = {
@@ -227,7 +225,7 @@ object AbstractCoordinatorConcurrencyTest {
case (tp, _) =>
(tp, ProducePartitionStatus(0L, new PartitionResponse(Errors.NONE,
0L, RecordBatch.NO_TIMESTAMP, 0L)))
})
- val delayedProduce = new DelayedProduce(5, produceMetadata, this,
responseCallback, delayedProduceLock) {
+ val delayedProduce = new DelayedProduce(5, produceMetadata, this,
responseCallback) {
// Complete produce requests after a few attempts to trigger delayed
produce from different threads
val completeAttempts = new AtomicInteger
override def tryComplete(): Boolean = {
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 5923566d92a..212a915c800 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -19,7 +19,6 @@ package kafka.coordinator.transaction
import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
-import java.util.concurrent.locks.ReentrantLock
import javax.management.ObjectName
import kafka.server.ReplicaManager
import kafka.utils.TestUtils
@@ -758,7 +757,6 @@ class TransactionStateManagerTest {
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
- any[Option[ReentrantLock]],
any(),
any(),
any()
@@ -803,7 +801,6 @@ class TransactionStateManagerTest {
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
- any[Option[ReentrantLock]],
any(),
any(),
any()
@@ -847,7 +844,6 @@ class TransactionStateManagerTest {
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
- any[Option[ReentrantLock]],
any(),
any(),
any())
@@ -901,7 +897,6 @@ class TransactionStateManagerTest {
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
- any[Option[ReentrantLock]],
any(),
any(),
any()
@@ -1118,7 +1113,6 @@ class TransactionStateManagerTest {
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
recordsCapture.capture(),
callbackCapture.capture(),
- any[Option[ReentrantLock]],
any(),
any(),
any()
@@ -1271,7 +1265,6 @@ class TransactionStateManagerTest {
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicIdPartition, MemoryRecords]],
capturedArgument.capture(),
- any[Option[ReentrantLock]],
any(),
any(),
any()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 5155b9f3aed..c0e6c7b9c91 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -2830,7 +2830,6 @@ class KafkaApisTest extends Logging {
any(),
responseCallback.capture(),
any(),
- any(),
ArgumentMatchers.eq(requestLocal),
any()
)).thenAnswer(_ => responseCallback.getValue.apply(Map(new
TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE))))
@@ -2886,7 +2885,6 @@ class KafkaApisTest extends Logging {
any(),
any(),
any(),
- any(),
ArgumentMatchers.eq(requestLocal),
any())
}
@@ -2965,7 +2963,6 @@ class KafkaApisTest extends Logging {
entriesPerPartition.capture(),
responseCallback.capture(),
any(),
- any(),
ArgumentMatchers.eq(RequestLocal.noCaching),
any()
)).thenAnswer { _ =>
diff --git
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
index 16bfdabdb67..82b91c44cca 100644
---
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
+++
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
@@ -18,8 +18,6 @@ package org.apache.kafka.server.purgatory;
import org.apache.kafka.server.util.timer.TimerTask;
-import java.util.Optional;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
@@ -41,19 +39,10 @@ public abstract class DelayedOperation extends TimerTask {
private volatile boolean completed = false;
- protected final Lock lock;
-
- public DelayedOperation(long delayMs, Optional<Lock> lockOpt) {
- this(delayMs, lockOpt.orElse(new ReentrantLock()));
- }
+ protected final ReentrantLock lock = new ReentrantLock();
public DelayedOperation(long delayMs) {
- this(delayMs, new ReentrantLock());
- }
-
- public DelayedOperation(long delayMs, Lock lock) {
super(delayMs);
- this.lock = lock;
}
/*