[ 
https://issues.apache.org/jira/browse/KAFKA-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644201#comment-16644201
 ] 

ASF GitHub Bot commented on KAFKA-7439:
---------------------------------------

lindong28 closed pull request #5691: KAFKA-7439: Replace EasyMock and PowerMock 
with Mockito in clients module
URL: https://github.com/apache/kafka/pull/5691
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 47fa18620fd..d230c115a83 100644
--- a/build.gradle
+++ b/build.gradle
@@ -829,9 +829,7 @@ project(':clients') {
 
     testCompile libs.bcpkix
     testCompile libs.junit
-    testCompile libs.easymock
-    testCompile libs.powermockJunit4
-    testCompile libs.powermockEasymock
+    testCompile libs.mockitoCore
 
     testRuntime libs.slf4jlog4j
     testRuntime libs.jacksonDatabind
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index bd5c11fc579..7810a3e8673 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -28,6 +28,7 @@
   <allow pkg="org.slf4j" />
   <allow pkg="org.junit" />
   <allow pkg="org.hamcrest" />
+  <allow pkg="org.mockito" />
   <allow pkg="org.easymock" />
   <allow pkg="org.powermock" />
   <allow pkg="java.security" />
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 1413eacacce..0abb5c45ba6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -50,7 +50,7 @@
  * is removed from the metadata refresh set after an update. Consumers disable 
topic expiry since they explicitly
  * manage topics while producers rely on topic expiry to limit the refresh set.
  */
-public final class Metadata implements Closeable {
+public class Metadata implements Closeable {
 
     private static final Logger log = LoggerFactory.getLogger(Metadata.class);
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index e4ba19779e6..b2098bfa065 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -595,6 +595,8 @@ public void close() {
     @Override
     public Node leastLoadedNode(long now) {
         List<Node> nodes = this.metadataUpdater.fetchNodes();
+        if (nodes.isEmpty())
+            throw new IllegalStateException("There are no nodes in the Kafka 
cluster");
         int inflight = Integer.MAX_VALUE;
         Node found = null;
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index e249c12d5cc..316b024e996 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -252,7 +252,6 @@
     private final Serializer<V> valueSerializer;
     private final ProducerConfig producerConfig;
     private final long maxBlockTimeMs;
-    private final int requestTimeoutMs;
     private final ProducerInterceptors<K, V> interceptors;
     private final ApiVersions apiVersions;
     private final TransactionManager transactionManager;
@@ -269,7 +268,7 @@
      *
      */
     public KafkaProducer(final Map<String, Object> configs) {
-        this(new ProducerConfig(configs), null, null, null, null);
+        this(new ProducerConfig(configs), null, null, null, null, null, 
Time.SYSTEM);
     }
 
     /**
@@ -287,10 +286,7 @@ public KafkaProducer(final Map<String, Object> configs) {
      */
     public KafkaProducer(Map<String, Object> configs, Serializer<K> 
keySerializer, Serializer<V> valueSerializer) {
         this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, 
keySerializer, valueSerializer)),
-            keySerializer,
-            valueSerializer,
-            null,
-            null);
+            keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
     }
 
     /**
@@ -301,7 +297,7 @@ public KafkaProducer(Map<String, Object> configs, 
Serializer<K> keySerializer, S
      * @param properties   The producer configs
      */
     public KafkaProducer(Properties properties) {
-        this(new ProducerConfig(properties), null, null, null, null);
+        this(new ProducerConfig(properties), null, null, null, null, null, 
Time.SYSTEM);
     }
 
     /**
@@ -317,20 +313,22 @@ public KafkaProducer(Properties properties) {
      */
     public KafkaProducer(Properties properties, Serializer<K> keySerializer, 
Serializer<V> valueSerializer) {
         this(new 
ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, 
valueSerializer)),
-                keySerializer, valueSerializer, null, null);
+                keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
     }
 
-    @SuppressWarnings("unchecked")
     // visible for testing
+    @SuppressWarnings("unchecked")
     KafkaProducer(ProducerConfig config,
                   Serializer<K> keySerializer,
                   Serializer<V> valueSerializer,
                   Metadata metadata,
-                  KafkaClient kafkaClient) {
+                  KafkaClient kafkaClient,
+                  ProducerInterceptors interceptors,
+                  Time time) {
         try {
             Map<String, Object> userProvidedConfigs = config.originals();
             this.producerConfig = config;
-            this.time = Time.SYSTEM;
+            this.time = time;
             String clientId = 
config.getString(ProducerConfig.CLIENT_ID_CONFIG);
             if (clientId.length() <= 0)
                 clientId = "producer-" + 
PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
@@ -356,7 +354,6 @@ public KafkaProducer(Properties properties, Serializer<K> 
keySerializer, Seriali
                     Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, 
clientId));
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
-            ProducerMetrics metricsRegistry = new 
ProducerMetrics(this.metrics);
             this.partitioner = 
config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
Partitioner.class);
             long retryBackoffMs = 
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
             if (keySerializer == null) {
@@ -378,20 +375,21 @@ public KafkaProducer(Properties properties, Serializer<K> 
keySerializer, Seriali
 
             // load interceptors and make sure they get clientId
             userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
-            List<ProducerInterceptor<K, V>> interceptorList = (List) (new 
ProducerConfig(userProvidedConfigs, 
false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
-                    ProducerInterceptor.class);
-            this.interceptors = new ProducerInterceptors<>(interceptorList);
-            ClusterResourceListeners clusterResourceListeners = 
configureClusterResourceListeners(keySerializer, valueSerializer, 
interceptorList, reporters);
+            ProducerConfig configWithClientId = new 
ProducerConfig(userProvidedConfigs, false);
+            List<ProducerInterceptor<K, V>> interceptorList = (List) 
configWithClientId.getConfiguredInstances(
+                    ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
ProducerInterceptor.class);
+            if (interceptors != null)
+                this.interceptors = interceptors;
+            else
+                this.interceptors = new 
ProducerInterceptors<>(interceptorList);
+            ClusterResourceListeners clusterResourceListeners = 
configureClusterResourceListeners(keySerializer,
+                    valueSerializer, interceptorList, reporters);
             this.maxRequestSize = 
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
             this.totalMemorySize = 
config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
             this.compressionType = 
CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
 
             this.maxBlockTimeMs = 
config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
-            this.requestTimeoutMs = 
config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
             this.transactionManager = configureTransactionState(config, 
logContext, log);
-            int retries = configureRetries(config, transactionManager != null, 
log);
-            int maxInflightRequests = configureInflightRequests(config, 
transactionManager != null);
-            short acks = configureAcks(config, transactionManager != null, 
log);
             int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
 
             this.apiVersions = new ApiVersions();
@@ -413,44 +411,13 @@ public KafkaProducer(Properties properties, Serializer<K> 
keySerializer, Seriali
             } else {
                 this.metadata = new Metadata(retryBackoffMs, 
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                     true, true, clusterResourceListeners);
-                this.metadata.update(Cluster.bootstrap(addresses), 
Collections.<String>emptySet(), time.milliseconds());
+                this.metadata.update(Cluster.bootstrap(addresses), 
Collections.emptySet(), time.milliseconds());
             }
-            ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(config);
-            Sensor throttleTimeSensor = 
Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
-            KafkaClient client = kafkaClient != null ? kafkaClient : new 
NetworkClient(
-                    new 
Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
-                            this.metrics, time, "producer", channelBuilder, 
logContext),
-                    this.metadata,
-                    clientId,
-                    maxInflightRequests,
-                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
-                    
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
-                    config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
-                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
-                    this.requestTimeoutMs,
-                    time,
-                    true,
-                    apiVersions,
-                    throttleTimeSensor,
-                    logContext);
-            this.sender = new Sender(logContext,
-                    client,
-                    this.metadata,
-                    this.accumulator,
-                    maxInflightRequests == 1,
-                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
-                    acks,
-                    retries,
-                    metricsRegistry.senderMetrics,
-                    Time.SYSTEM,
-                    this.requestTimeoutMs,
-                    config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
-                    this.transactionManager,
-                    apiVersions);
+            this.errors = this.metrics.sensor("errors");
+            this.sender = newSender(logContext, kafkaClient, this.metadata);
             String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
             this.ioThread.start();
-            this.errors = this.metrics.sensor("errors");
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
             log.debug("Kafka producer started");
@@ -462,6 +429,47 @@ public KafkaProducer(Properties properties, Serializer<K> 
keySerializer, Seriali
         }
     }
 
+    // visible for testing
+    Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata 
metadata) {
+        int maxInflightRequests = configureInflightRequests(producerConfig, 
transactionManager != null);
+        int requestTimeoutMs = 
producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(producerConfig);
+        ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
+        Sensor throttleTimeSensor = 
Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
+        KafkaClient client = kafkaClient != null ? kafkaClient : new 
NetworkClient(
+                new 
Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
+                        this.metrics, time, "producer", channelBuilder, 
logContext),
+                metadata,
+                clientId,
+                maxInflightRequests,
+                
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
+                producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
+                requestTimeoutMs,
+                time,
+                true,
+                apiVersions,
+                throttleTimeSensor,
+                logContext);
+        int retries = configureRetries(producerConfig, transactionManager != 
null, log);
+        short acks = configureAcks(producerConfig, transactionManager != null, 
log);
+        return new Sender(logContext,
+                client,
+                metadata,
+                this.accumulator,
+                maxInflightRequests == 1,
+                producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
+                acks,
+                retries,
+                metricsRegistry.senderMetrics,
+                time,
+                requestTimeoutMs,
+                producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
+                this.transactionManager,
+                apiVersions);
+    }
+
     private static int configureDeliveryTimeout(ProducerConfig config, Logger 
log) {
         int deliveryTimeoutMs = 
config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
         int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index c5df2dacbca..22d747265d6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -135,7 +135,7 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) 
throws InterruptedEx
                         } finally {
                             long endWaitNs = time.nanoseconds();
                             timeNs = Math.max(0L, endWaitNs - startWaitNs);
-                            this.waitTime.record(timeNs, time.milliseconds());
+                            recordWaitTime(timeNs);
                         }
 
                         if (waitingTimeElapsed) {
@@ -185,6 +185,11 @@ public ByteBuffer allocate(int size, long 
maxTimeToBlockMs) throws InterruptedEx
             return buffer;
     }
 
+    // Protected for testing
+    protected void recordWaitTime(long timeNs) {
+        this.waitTime.record(timeNs, time.milliseconds());
+    }
+
     /**
      * Allocate a buffer.  If buffer allocation fails (e.g. because of OOM) 
then return the size count back to
      * available memory and signal the next waiter if it exists.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 7640377ac17..c50a85f06da 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -40,8 +40,6 @@
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
@@ -149,7 +147,7 @@ public Sender(LogContext logContext,
         this.acks = acks;
         this.retries = retries;
         this.time = time;
-        this.sensors = new SenderMetrics(metricsRegistry);
+        this.sensors = new SenderMetrics(metricsRegistry, metadata, client, 
time);
         this.requestTimeoutMs = requestTimeoutMs;
         this.retryBackoffMs = retryBackoffMs;
         this.apiVersions = apiVersions;
@@ -811,7 +809,7 @@ public static Sensor 
throttleTimeSensor(SenderMetricsRegistry metrics) {
     /**
      * A collection of sensors for the sender
      */
-    private class SenderMetrics {
+    private static class SenderMetrics {
         public final Sensor retrySensor;
         public final Sensor errorSensor;
         public final Sensor queueTimeSensor;
@@ -822,9 +820,11 @@ public static Sensor 
throttleTimeSensor(SenderMetricsRegistry metrics) {
         public final Sensor maxRecordSizeSensor;
         public final Sensor batchSplitSensor;
         private final SenderMetricsRegistry metrics;
+        private final Time time;
 
-        public SenderMetrics(SenderMetricsRegistry metrics) {
+        public SenderMetrics(SenderMetricsRegistry metrics, Metadata metadata, 
KafkaClient client, Time time) {
             this.metrics = metrics;
+            this.time = time;
 
             this.batchSizeSensor = metrics.sensor("batch-size");
             this.batchSizeSensor.add(metrics.batchSizeAvg, new Avg());
@@ -855,16 +855,9 @@ public SenderMetrics(SenderMetricsRegistry metrics) {
             this.maxRecordSizeSensor.add(metrics.recordSizeMax, new Max());
             this.maxRecordSizeSensor.add(metrics.recordSizeAvg, new Avg());
 
-            this.metrics.addMetric(metrics.requestsInFlight, new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return client.inFlightRequestCount();
-                }
-            });
-            metrics.addMetric(metrics.metadataAge, new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return (now - metadata.lastSuccessfulUpdate()) / 1000.0;
-                }
-            });
+            this.metrics.addMetric(metrics.requestsInFlight, (config, now) -> 
client.inFlightRequestCount());
+            this.metrics.addMetric(metrics.metadataAge,
+                (config, now) -> (now - metadata.lastSuccessfulUpdate()) / 
1000.0);
 
             this.batchSplitSensor = metrics.sensor("batch-split-rate");
             this.batchSplitSensor.add(new Meter(metrics.batchSplitRate, 
metrics.batchSplitTotal));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 2a3cbe0b72d..4ac5876c5f0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -74,7 +74,6 @@
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -115,6 +114,11 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 public class KafkaConsumerTest {
     private final String topic = "test";
@@ -1846,18 +1850,16 @@ private FetchResponse fetchResponse(TopicPartition 
partition, long fetchOffset,
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testCloseWithTimeUnit() {
-        KafkaConsumer consumer = 
EasyMock.partialMockBuilder(KafkaConsumer.class)
-                .addMockedMethod("close", Duration.class).createMock();
-        consumer.close(Duration.ofSeconds(1));
-        EasyMock.expectLastCall();
-        EasyMock.replay(consumer);
+        KafkaConsumer consumer = mock(KafkaConsumer.class);
+        doCallRealMethod().when(consumer).close(anyLong(), any());
         consumer.close(1, TimeUnit.SECONDS);
-        EasyMock.verify(consumer);
+        verify(consumer).close(Duration.ofSeconds(1));
     }
 
     @Test(expected = InvalidTopicException.class)
-    public void testSubscriptionOnInvalidTopic() throws Exception {
+    public void testSubscriptionOnInvalidTopic() {
         Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster();
         Node node = cluster.nodes().get(0);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 4494fd5bc02..8f6328d6a18 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -32,16 +32,19 @@
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
 import org.junit.Test;
 
-import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkClientTest {
 
@@ -148,69 +151,41 @@ public void testTimeoutUnsentRequest() {
 
     @Test
     public void doNotBlockIfPollConditionIsSatisfied() {
-        NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
+        NetworkClient mockNetworkClient = mock(NetworkClient.class);
         ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new 
LogContext(),
                 mockNetworkClient, metadata, time, 100, 1000, 
Integer.MAX_VALUE);
 
         // expect poll, but with no timeout
-        EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(0L), 
EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
-
-        EasyMock.replay(mockNetworkClient);
-
-        consumerClient.poll(time.timer(Long.MAX_VALUE), new 
ConsumerNetworkClient.PollCondition() {
-            @Override
-            public boolean shouldBlock() {
-                return false;
-            }
-        });
-
-        EasyMock.verify(mockNetworkClient);
+        consumerClient.poll(time.timer(Long.MAX_VALUE), () -> false);
+        verify(mockNetworkClient).poll(eq(0L), anyLong());
     }
 
     @Test
     public void blockWhenPollConditionNotSatisfied() {
         long timeout = 4000L;
 
-        NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
+        NetworkClient mockNetworkClient = mock(NetworkClient.class);
         ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new 
LogContext(),
                 mockNetworkClient, metadata, time, 100, 1000, 
Integer.MAX_VALUE);
 
-        EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(1);
-        EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(timeout), 
EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
-
-        EasyMock.replay(mockNetworkClient);
-
-        consumerClient.poll(time.timer(timeout), new 
ConsumerNetworkClient.PollCondition() {
-            @Override
-            public boolean shouldBlock() {
-                return true;
-            }
-        });
-
-        EasyMock.verify(mockNetworkClient);
+        when(mockNetworkClient.inFlightRequestCount()).thenReturn(1);
+        consumerClient.poll(time.timer(timeout), () -> true);
+        verify(mockNetworkClient).poll(eq(timeout), anyLong());
     }
 
     @Test
     public void blockOnlyForRetryBackoffIfNoInflightRequests() {
         long retryBackoffMs = 100L;
 
-        NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
+        NetworkClient mockNetworkClient = mock(NetworkClient.class);
         ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new 
LogContext(),
                 mockNetworkClient, metadata, time, retryBackoffMs, 1000, 
Integer.MAX_VALUE);
 
-        EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(0);
-        EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(retryBackoffMs), 
EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
+        when(mockNetworkClient.inFlightRequestCount()).thenReturn(0);
 
-        EasyMock.replay(mockNetworkClient);
-
-        consumerClient.poll(time.timer(Long.MAX_VALUE), new 
ConsumerNetworkClient.PollCondition() {
-            @Override
-            public boolean shouldBlock() {
-                return true;
-            }
-        });
+        consumerClient.poll(time.timer(Long.MAX_VALUE), () -> true);
 
-        EasyMock.verify(mockNetworkClient);
+        verify(mockNetworkClient).poll(eq(retryBackoffMs), anyLong());
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index dc6fe9f52fc..77fcb517803 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -20,9 +20,11 @@
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
+import org.apache.kafka.clients.producer.internals.Sender;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
@@ -39,6 +41,7 @@
 import org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.MockMetricsReporter;
@@ -46,15 +49,8 @@
 import org.apache.kafka.test.MockProducerInterceptor;
 import org.apache.kafka.test.MockSerializer;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.support.membermodification.MemberModifier;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareOnlyThisForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -66,14 +62,23 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore("javax.management.*")
 public class KafkaProducerTest {
 
     @Test
@@ -209,7 +214,7 @@ public void shouldCloseProperlyAndThrowIfInterrupted() 
throws Exception {
 
         final Producer<String, String> producer = new KafkaProducer<>(
             new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new 
StringSerializer(), new StringSerializer())),
-            new StringSerializer(), new StringSerializer(), metadata, client);
+            new StringSerializer(), new StringSerializer(), metadata, client, 
null, time);
 
         ExecutorService executor = Executors.newSingleThreadExecutor();
         final AtomicReference<Exception> closeException = new 
AtomicReference<>();
@@ -271,17 +276,13 @@ public void testInvalidSocketReceiveBufferSize() {
         new KafkaProducer<>(config, new ByteArraySerializer(), new 
ByteArraySerializer());
     }
 
-    @PrepareOnlyThisForTest(Metadata.class)
     @Test
-    public void testMetadataFetch() throws Exception {
+    public void testMetadataFetch() throws InterruptedException {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
-        KafkaProducer<String, String> producer = new KafkaProducer<>(props, 
new StringSerializer(), new StringSerializer());
-        Metadata metadata = PowerMock.createNiceMock(Metadata.class);
-        MemberModifier.field(KafkaProducer.class, "metadata").set(producer, 
metadata);
-
+        ProducerConfig config = new 
ProducerConfig(ProducerConfig.addSerializerToConfig(props, new 
StringSerializer(),
+                new StringSerializer()));
         String topic = "topic";
-        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 
"value");
         Collection<Node> nodes = Collections.singletonList(new Node(0, 
"host1", 1000));
         final Cluster emptyCluster = new Cluster(null, nodes,
                 Collections.emptySet(),
@@ -293,42 +294,48 @@ public void testMetadataFetch() throws Exception {
                 Collections.singletonList(new PartitionInfo(topic, 0, null, 
null, null)),
                 Collections.emptySet(),
                 Collections.emptySet());
+        Metadata metadata = mock(Metadata.class);
+
+        // Return empty cluster 4 times and cluster from then on
+        when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, 
emptyCluster, emptyCluster, cluster);
 
-        // Expect exactly one fetch for each attempt to refresh while topic 
metadata is not available
-        final int refreshAttempts = 5;
-        
EasyMock.expect(metadata.fetch()).andReturn(emptyCluster).times(refreshAttempts 
- 1);
-        EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
-        EasyMock.expect(metadata.fetch()).andThrow(new 
IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
-        PowerMock.replay(metadata);
+        KafkaProducer<String, String> producer = new KafkaProducer<String, 
String>(config, null, null,
+                metadata, new MockClient(Time.SYSTEM, metadata), null, 
Time.SYSTEM) {
+            @Override
+            Sender newSender(LogContext logContext, KafkaClient kafkaClient, 
Metadata metadata) {
+                // give Sender its own Metadata instance so that we can 
isolate Metadata calls from KafkaProducer
+                return super.newSender(logContext, kafkaClient, new 
Metadata(0, 100_000, true));
+            }
+        };
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 
"value");
         producer.send(record);
-        PowerMock.verify(metadata);
 
-        // Expect exactly one fetch if topic metadata is available
-        PowerMock.reset(metadata);
-        EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
-        EasyMock.expect(metadata.fetch()).andThrow(new 
IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
-        PowerMock.replay(metadata);
+        // One request update for each empty cluster returned
+        verify(metadata, times(4)).requestUpdate();
+        verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(5)).fetch();
+
+        // Should not request update for subsequent `send`
         producer.send(record, null);
-        PowerMock.verify(metadata);
+        verify(metadata, times(4)).requestUpdate();
+        verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(6)).fetch();
 
-        // Expect exactly one fetch if topic metadata is available
-        PowerMock.reset(metadata);
-        EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
-        EasyMock.expect(metadata.fetch()).andThrow(new 
IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
-        PowerMock.replay(metadata);
+        // Should not request update for subsequent `partitionsFor`
         producer.partitionsFor(topic);
-        PowerMock.verify(metadata);
+        verify(metadata, times(4)).requestUpdate();
+        verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(7)).fetch();
+
+        producer.close(0, TimeUnit.MILLISECONDS);
     }
 
-    @PrepareOnlyThisForTest(Metadata.class)
     @Test
     public void testMetadataFetchOnStaleMetadata() throws Exception {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
-        KafkaProducer<String, String> producer = new KafkaProducer<>(props, 
new StringSerializer(), new StringSerializer());
-        Metadata metadata = PowerMock.createNiceMock(Metadata.class);
-        MemberModifier.field(KafkaProducer.class, "metadata").set(producer, 
metadata);
-
+        ProducerConfig config = new 
ProducerConfig(ProducerConfig.addSerializerToConfig(props, new 
StringSerializer(),
+                new StringSerializer()));
         String topic = "topic";
         ProducerRecord<String, String> initialRecord = new 
ProducerRecord<>(topic, "value");
         // Create a record with a partition higher than the initial (outdated) 
partition range
@@ -353,134 +360,145 @@ public void testMetadataFetchOnStaleMetadata() throws 
Exception {
                         new PartitionInfo(topic, 2, null, null, null)),
                 Collections.emptySet(),
                 Collections.emptySet());
+        Metadata metadata = mock(Metadata.class);
+
+        AtomicInteger invocationCount = new AtomicInteger(0);
+
+        // Return empty cluster 4 times, initialCluster 5 times and 
extendedCluster after that
+        when(metadata.fetch()).then(invocation -> {
+            invocationCount.incrementAndGet();
+            if (invocationCount.get() > 9)
+                return extendedCluster;
+            else if (invocationCount.get() > 4)
+                return initialCluster;
+            return emptyCluster;
+        });
 
-        // Expect exactly one fetch for each attempt to refresh while topic 
metadata is not available
-        final int refreshAttempts = 5;
-        
EasyMock.expect(metadata.fetch()).andReturn(emptyCluster).times(refreshAttempts 
- 1);
-        EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once();
-        EasyMock.expect(metadata.fetch()).andThrow(new 
IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
-        PowerMock.replay(metadata);
+        KafkaProducer<String, String> producer = new KafkaProducer<String, 
String>(config, null, null,
+                metadata, new MockClient(Time.SYSTEM, metadata), null, 
Time.SYSTEM) {
+            @Override
+            Sender newSender(LogContext logContext, KafkaClient kafkaClient, 
Metadata metadata) {
+                // give Sender its own Metadata instance so that we can 
isolate Metadata calls from KafkaProducer
+                return super.newSender(logContext, kafkaClient, new 
Metadata(0, 100_000, true));
+            }
+        };
         producer.send(initialRecord);
-        PowerMock.verify(metadata);
 
-        // Expect exactly one fetch if topic metadata is available and records 
are still within range
-        PowerMock.reset(metadata);
-        EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once();
-        EasyMock.expect(metadata.fetch()).andThrow(new 
IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
-        PowerMock.replay(metadata);
-        producer.send(initialRecord, null);
-        PowerMock.verify(metadata);
+        // One request update for each empty cluster returned
+        verify(metadata, times(4)).requestUpdate();
+        verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(5)).fetch();
+
+        // Should not request update if metadata is available and records are 
within range
+        producer.send(initialRecord);
+        verify(metadata, times(4)).requestUpdate();
+        verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(6)).fetch();
 
-        // Expect exactly two fetches if topic metadata is available but 
metadata response still returns
+        // One request update followed by exception if topic metadata is 
available but metadata response still returns
         // the same partition size (either because metadata are still stale at 
the broker too or because
-        // there weren't any partitions added in the first place).
-        PowerMock.reset(metadata);
-        EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once();
-        EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once();
-        EasyMock.expect(metadata.fetch()).andThrow(new 
IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
-        PowerMock.replay(metadata);
+        // there weren't any partitions added in the first place)
         try {
-            producer.send(extendedRecord, null);
+            producer.send(extendedRecord);
             fail("Expected KafkaException to be raised");
         } catch (KafkaException e) {
             // expected
         }
-        PowerMock.verify(metadata);
-
-        // Expect exactly two fetches if topic metadata is available but 
outdated for the given record
-        PowerMock.reset(metadata);
-        EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once();
-        EasyMock.expect(metadata.fetch()).andReturn(extendedCluster).once();
-        EasyMock.expect(metadata.fetch()).andThrow(new 
IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
-        PowerMock.replay(metadata);
-        producer.send(extendedRecord, null);
-        PowerMock.verify(metadata);
+        verify(metadata, times(5)).requestUpdate();
+        verify(metadata, times(5)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(8)).fetch();
+
+        // One request update if metadata is available but outdated for the 
given record
+        producer.send(extendedRecord);
+        verify(metadata, times(6)).requestUpdate();
+        verify(metadata, times(6)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(10)).fetch();
+
+        producer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
-    public void testTopicRefreshInMetadata() throws Exception {
+    public void testTopicRefreshInMetadata() throws InterruptedException {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
         props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "600000");
-        KafkaProducer<String, String> producer = new KafkaProducer<>(props, 
new StringSerializer(), new StringSerializer());
+        ProducerConfig config = new 
ProducerConfig(ProducerConfig.addSerializerToConfig(props, new 
StringSerializer(),
+            new StringSerializer()));
         long refreshBackoffMs = 500L;
         long metadataExpireMs = 60000L;
         final Metadata metadata = new Metadata(refreshBackoffMs, 
metadataExpireMs, true,
                 true, new ClusterResourceListeners());
         final Time time = new MockTime();
-        MemberModifier.field(KafkaProducer.class, "metadata").set(producer, 
metadata);
-        MemberModifier.field(KafkaProducer.class, "time").set(producer, time);
         final String topic = "topic";
-
-        Thread t = new Thread(() -> {
-            long startTimeMs = System.currentTimeMillis();
-            for (int i = 0; i < 10; i++) {
-                while (!metadata.updateRequested() && 
System.currentTimeMillis() - startTimeMs < 1000)
-                    Thread.yield();
-                metadata.update(Cluster.empty(), Collections.singleton(topic), 
time.milliseconds());
-                time.sleep(60 * 1000L);
+        try (KafkaProducer<String, String> producer = new 
KafkaProducer<>(config, null, null, metadata,
+                null, null, time)) {
+
+            Thread t = new Thread(() -> {
+                long startTimeMs = System.currentTimeMillis();
+                for (int i = 0; i < 10; i++) {
+                    while (!metadata.updateRequested() && 
System.currentTimeMillis() - startTimeMs < 1000)
+                        Thread.yield();
+                    metadata.update(Cluster.empty(), 
Collections.singleton(topic), time.milliseconds());
+                    time.sleep(60 * 1000L);
+                }
+            });
+            t.start();
+            try {
+                producer.partitionsFor(topic);
+                fail("Expect TimeoutException");
+            } catch (TimeoutException e) {
+                // skip
             }
-        });
-        t.start();
-        try {
-            producer.partitionsFor(topic);
-            fail("Expect TimeoutException");
-        } catch (TimeoutException e) {
-            // skip
+            t.join();
         }
-        Assert.assertTrue("Topic should still exist in metadata", 
metadata.containsTopic(topic));
+        assertTrue("Topic should still exist in metadata", 
metadata.containsTopic(topic));
     }
 
-    @SuppressWarnings("unchecked") // safe as generic parameters won't vary
-    @PrepareOnlyThisForTest(Metadata.class)
     @Test
-    public void testHeadersWithExtendedClasses() throws Exception {
+    @Deprecated
+    public void testHeadersWithExtendedClasses() {
         doTestHeaders(ExtendedSerializer.class);
     }
 
-    @SuppressWarnings("unchecked")
-    @PrepareOnlyThisForTest(Metadata.class)
     @Test
-    public void testHeaders() throws Exception {
+    public void testHeaders() {
         doTestHeaders(Serializer.class);
     }
 
-    private <T extends Serializer<String>> void doTestHeaders(Class<T> 
serializerClassToMock) throws Exception {
+    private <T extends Serializer<String>> void doTestHeaders(Class<T> 
serializerClassToMock) {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
-        T keySerializer = PowerMock.createNiceMock(serializerClassToMock);
-        T valueSerializer = PowerMock.createNiceMock(serializerClassToMock);
-
-        KafkaProducer<String, String> producer = new KafkaProducer<>(props, 
keySerializer, valueSerializer);
-        Metadata metadata = PowerMock.createNiceMock(Metadata.class);
-        MemberModifier.field(KafkaProducer.class, "metadata").set(producer, 
metadata);
+        @SuppressWarnings("unchecked") // it is safe to suppress, since this 
is a mock class
+        Serializer<String> keySerializer = mock(serializerClassToMock);
+        @SuppressWarnings("unchecked")
+        Serializer<String> valueSerializer = mock(serializerClassToMock);
+        ProducerConfig config = new 
ProducerConfig(ProducerConfig.addSerializerToConfig(props, keySerializer,
+                valueSerializer));
 
         String topic = "topic";
         final Cluster cluster = new Cluster(
                 "dummy",
                 Collections.singletonList(new Node(0, "host1", 1000)),
-                Collections.singletonList(new PartitionInfo(topic, 0, null, 
null, null)),
+                Collections.singletonList(new PartitionInfo(topic, 0, null, 
new Node[0], new Node[0])),
                 Collections.emptySet(),
                 Collections.emptySet());
+        Metadata metadata = new Metadata(0, 90000, true);
+        metadata.update(cluster, Collections.emptySet(), 
Time.SYSTEM.milliseconds());
 
+        KafkaProducer<String, String> producer = new KafkaProducer<>(config, 
keySerializer, valueSerializer,
+                metadata, null, null, Time.SYSTEM);
 
-        EasyMock.expect(metadata.fetch()).andReturn(cluster).anyTimes();
-
-        PowerMock.replay(metadata);
+        when(keySerializer.serialize(any(), any(), any())).then(invocation ->
+                invocation.<String>getArgument(2).getBytes());
+        when(valueSerializer.serialize(any(), any(), any())).then(invocation ->
+                invocation.<String>getArgument(2).getBytes());
 
         String value = "value";
-
-        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 
value);
-        EasyMock.expect(keySerializer.serialize(topic, record.headers(), 
null)).andReturn(null).once();
-        EasyMock.expect(valueSerializer.serialize(topic, record.headers(), 
value)).andReturn(value.getBytes()).once();
-
-        PowerMock.replay(keySerializer);
-        PowerMock.replay(valueSerializer);
-
+        String key = "key";
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 
key, value);
 
         //ensure headers can be mutated pre send.
         record.headers().add(new RecordHeader("test", "header2".getBytes()));
-
         producer.send(record, null);
 
         //ensure headers are closed and cannot be mutated post send
@@ -492,11 +510,12 @@ public void testHeaders() throws Exception {
         }
 
         //ensure existing headers are not changed, and last header for key is 
still original value
-        assertTrue(Arrays.equals(record.headers().lastHeader("test").value(), 
"header2".getBytes()));
+        assertArrayEquals(record.headers().lastHeader("test").value(), 
"header2".getBytes());
 
-        PowerMock.verify(valueSerializer);
-        PowerMock.verify(keySerializer);
+        verify(valueSerializer).serialize(topic, record.headers(), value);
+        verify(keySerializer).serialize(topic, record.headers(), key);
 
+        producer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -522,40 +541,38 @@ public void testMetricConfigRecordingLevel() {
         }
     }
 
-    @PrepareOnlyThisForTest(Metadata.class)
     @Test
-    public void testInterceptorPartitionSetOnTooLargeRecord() throws Exception 
{
+    public void testInterceptorPartitionSetOnTooLargeRecord() {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
         props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1");
+        ProducerConfig config = new 
ProducerConfig(ProducerConfig.addSerializerToConfig(props, new 
StringSerializer(),
+                new StringSerializer()));
         String topic = "topic";
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, 
"value");
 
-        KafkaProducer<String, String> producer = new KafkaProducer<>(props, 
new StringSerializer(),
-                new StringSerializer());
-        Metadata metadata = PowerMock.createNiceMock(Metadata.class);
-        MemberModifier.field(KafkaProducer.class, "metadata").set(producer, 
metadata);
+        Metadata metadata = new Metadata(0, 90000, true);
         final Cluster cluster = new Cluster(
             "dummy",
             Collections.singletonList(new Node(0, "host1", 1000)),
-            Collections.singletonList(new PartitionInfo(topic, 0, null, null, 
null)),
+            Collections.singletonList(new PartitionInfo(topic, 0, null, new 
Node[0], new Node[0])),
             Collections.emptySet(),
             Collections.emptySet());
-        EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
+        metadata.update(cluster, Collections.emptySet(), 
Time.SYSTEM.milliseconds());
 
-        // Mock interceptors field
         @SuppressWarnings("unchecked") // it is safe to suppress, since this 
is a mock class
-        ProducerInterceptors<String, String> interceptors = 
PowerMock.createMock(ProducerInterceptors.class);
-        EasyMock.expect(interceptors.onSend(record)).andReturn(record);
-        interceptors.onSendError(EasyMock.eq(record), EasyMock.notNull(), 
EasyMock.notNull());
-        EasyMock.expectLastCall();
-        MemberModifier.field(KafkaProducer.class, 
"interceptors").set(producer, interceptors);
-
-        PowerMock.replay(metadata);
-        EasyMock.replay(interceptors);
+                ProducerInterceptors<String, String> interceptors = 
mock(ProducerInterceptors.class);
+        KafkaProducer<String, String> producer = new KafkaProducer<>(config, 
null, null,
+                metadata, null, interceptors, Time.SYSTEM);
+
+        when(interceptors.onSend(any())).then(invocation -> 
invocation.getArgument(0));
+
         producer.send(record);
 
-        EasyMock.verify(interceptors);
+        verify(interceptors).onSend(record);
+        verify(interceptors).onSendError(eq(record), notNull(), notNull());
+
+        producer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -577,7 +594,7 @@ public void testInitTransactionTimeout() {
         props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
 
-        Time time = new MockTime();
+        Time time = Time.SYSTEM;
         Cluster cluster = TestUtils.singletonCluster("topic", 1);
         Node node = cluster.nodes().get(0);
 
@@ -589,7 +606,7 @@ public void testInitTransactionTimeout() {
 
         try (Producer<String, String> producer = new KafkaProducer<>(
                 new ProducerConfig(ProducerConfig.addSerializerToConfig(props, 
new StringSerializer(), new StringSerializer())),
-                new StringSerializer(), new StringSerializer(), metadata, 
client)) {
+                null, null, metadata, client, null, time)) {
             producer.initTransactions();
             fail("initTransactions() should have raised TimeoutException");
         }
@@ -614,7 +631,7 @@ public void 
testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
 
         Producer<String, String> producer = new KafkaProducer<>(
                 new ProducerConfig(ProducerConfig.addSerializerToConfig(props, 
new StringSerializer(), new StringSerializer())),
-                new StringSerializer(), new StringSerializer(), metadata, 
client);
+                null, null, metadata, client, null, time);
         try {
             producer.initTransactions();
         } catch (TimeoutException e) {
@@ -630,7 +647,6 @@ public void 
testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
 
     @Test
     public void testSendToInvalidTopic() throws Exception {
-
         Properties props = new Properties();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
         props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
@@ -647,9 +663,9 @@ public void testSendToInvalidTopic() throws Exception {
 
         Producer<String, String> producer = new KafkaProducer<>(new 
ProducerConfig(
                 ProducerConfig.addSerializerToConfig(props, new 
StringSerializer(), new StringSerializer())),
-                new StringSerializer(), new StringSerializer(), metadata, 
client);
+                null, null, metadata, client, null, time);
 
-        String invalidTopicName = "topic abc";          // Invalid topic name 
due to space
+        String invalidTopicName = "topic abc"; // Invalid topic name due to 
space
         ProducerRecord<String, String> record = new 
ProducerRecord<>(invalidTopicName, "HelloKafka");
 
         Set<String> invalidTopic = new HashSet<>();
@@ -665,8 +681,11 @@ public void testSendToInvalidTopic() throws Exception {
 
         Future<RecordMetadata> future = producer.send(record);
 
-        assertEquals("Cluster has incorrect invalid topic list.", 
metaDataUpdateResponseCluster.invalidTopics(), 
metadata.fetch().invalidTopics());
+        assertEquals("Cluster has incorrect invalid topic list", 
metaDataUpdateResponseCluster.invalidTopics(),
+                metadata.fetch().invalidTopics());
         TestUtils.assertFutureError(future, InvalidTopicException.class);
+
+        producer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -689,7 +708,7 @@ public void testCloseWhenWaitingForMetadataUpdate() throws 
InterruptedException
 
         Producer<String, String> producer = new KafkaProducer<>(
                 new ProducerConfig(ProducerConfig.addSerializerToConfig(props, 
new StringSerializer(), new StringSerializer())),
-                new StringSerializer(), new StringSerializer(), metadata, 
client);
+                new StringSerializer(), new StringSerializer(), metadata, 
client, null, time);
 
         ExecutorService executor = Executors.newSingleThreadExecutor();
         final AtomicReference<Exception> sendException = new 
AtomicReference<>();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 23fc5411b3c..ce74eb1f0f1 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -16,11 +16,8 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestUtils;
@@ -36,25 +33,16 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.anyLong;
-import static org.easymock.EasyMock.anyDouble;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.anyString;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
-
-@RunWith(PowerMockRunner.class)
 public class BufferPoolTest {
     private final MockTime time = new MockTime();
     private final Metrics metrics = new Metrics(time);
@@ -241,36 +229,25 @@ public void 
testCleanupMemoryAvailabilityWaiterOnInterruption() throws Exception
         // both the allocate() called by threads t1 and t2 should have been 
interrupted and the waiters queue should be empty
         assertEquals(pool.queued(), 0);
     }
-
-    @PrepareForTest({Sensor.class, MetricName.class})
+    
     @Test
     public void testCleanupMemoryAvailabilityOnMetricsException() throws 
Exception {
-        Metrics mockedMetrics = createNiceMock(Metrics.class);
-        Sensor mockedSensor = createNiceMock(Sensor.class);
-        MetricName metricName = createNiceMock(MetricName.class);
-        MetricName rateMetricName = createNiceMock(MetricName.class);
-        MetricName totalMetricName = createNiceMock(MetricName.class);
-
-        
expect(mockedMetrics.sensor(BufferPool.WAIT_TIME_SENSOR_NAME)).andReturn(mockedSensor);
+        BufferPool bufferPool = spy(new BufferPool(2, 1, new Metrics(), time, 
metricGroup));
+        doThrow(new 
OutOfMemoryError()).when(bufferPool).recordWaitTime(anyLong());
 
-        mockedSensor.record(anyDouble(), anyLong());
-        expectLastCall().andThrow(new OutOfMemoryError());
-        expect(mockedMetrics.metricName(anyString(), eq(metricGroup), 
anyString())).andReturn(metricName);
-        expect(mockedSensor.add(new Meter(TimeUnit.NANOSECONDS, 
rateMetricName, totalMetricName))).andReturn(true);
-
-        replay(mockedMetrics, mockedSensor, metricName);
-
-        BufferPool bufferPool = new BufferPool(2, 1, mockedMetrics, time,  
metricGroup);
         bufferPool.allocate(1, 0);
         try {
             bufferPool.allocate(2, 1000);
-            assertTrue("Expected oom.", false);
+            fail("Expected oom.");
         } catch (OutOfMemoryError expected) {
         }
         assertEquals(1, bufferPool.availableMemory());
         assertEquals(0, bufferPool.queued());
+        assertEquals(1, bufferPool.unallocatedMemory());
         //This shouldn't timeout
         bufferPool.allocate(1, 0);
+
+        verify(bufferPool).recordWaitTime(anyLong());
     }
 
     private static class BufferPoolAllocator implements Runnable {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index f93f3432bdd..f2a34f6c89c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -84,24 +84,23 @@
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.InOrder;
 
-import static org.easymock.EasyMock.anyBoolean;
-import static org.easymock.EasyMock.anyInt;
-import static org.easymock.EasyMock.anyLong;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.geq;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
+import static org.mockito.AdditionalMatchers.geq;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.spy;
 
 public class SenderTest {
 
@@ -140,7 +139,8 @@ public void tearDown() {
     @Test
     public void testSimple() throws Exception {
         long offset = 0;
-        Future<RecordMetadata> future = accumulator.append(tp0, 0L, 
"key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> future = accumulator.append(tp0, 0L, 
"key".getBytes(), "value".getBytes(),
+                null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, 
client.inFlightRequestCount());
@@ -2037,33 +2037,27 @@ public void testExpiredBatchDoesNotRetry() throws 
Exception {
 
     @Test
     public void testResetNextBatchExpiry() throws Exception {
-        MockClient delegateClient = new MockClient(time);
-        client = mock(MockClient.class);
-        expect(client.ready(anyObject(), 
anyLong())).andDelegateTo(delegateClient).anyTimes();
-        expect(
-            client.newClientRequest(
-                anyString(), anyObject(), anyLong(), anyBoolean(), anyInt(), 
anyObject()))
-            .andDelegateTo(delegateClient).anyTimes();
-        client.send(anyObject(), anyLong());
-        expectLastCall().andDelegateTo(delegateClient).anyTimes();
-        expect(client.poll(eq(0L), 
anyLong())).andDelegateTo(delegateClient).times(1);
-        expect(client.poll(eq(accumulator.getDeliveryTimeoutMs()), anyLong()))
-            .andDelegateTo(delegateClient)
-            .times(1);
-        expect(client.poll(geq(1L), 
anyLong())).andDelegateTo(delegateClient).times(1);
-        replay(client);
+        client = spy(new MockClient(time));
 
         setupWithTransactionState(null);
 
-        accumulator.append(
-            tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, 
MAX_BLOCK_TIMEOUT);
+        accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), 
null, null,
+                MAX_BLOCK_TIMEOUT);
 
         sender.run(time.milliseconds());
         sender.run(time.milliseconds());
         time.setCurrentTimeMs(time.milliseconds() + 
accumulator.getDeliveryTimeoutMs() + 1);
         sender.run(time.milliseconds());
 
-        verify(client);
+        InOrder inOrder = inOrder(client);
+        inOrder.verify(client, atLeastOnce()).ready(any(), anyLong());
+        inOrder.verify(client, atLeastOnce()).newClientRequest(anyString(), 
any(), anyLong(), anyBoolean(), anyInt(),
+                any());
+        inOrder.verify(client, atLeastOnce()).send(any(), anyLong());
+        inOrder.verify(client).poll(eq(0L), anyLong());
+        inOrder.verify(client).poll(eq(accumulator.getDeliveryTimeoutMs()), 
anyLong());
+        inOrder.verify(client).poll(geq(1L), anyLong());
+
     }
 
     private class MatchingBufferPool extends BufferPool {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
index c2b89fe6882..27daf0face8 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
@@ -25,7 +25,6 @@
 import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
-import org.easymock.EasyMock;
 import org.junit.Test;
 
 import java.net.InetAddress;
@@ -35,13 +34,14 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public class ChannelBuildersTest {
 
     @Test
     public void testCreateOldPrincipalBuilder() throws Exception {
-        TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
-        Authenticator authenticator = EasyMock.mock(Authenticator.class);
+        TransportLayer transportLayer = mock(TransportLayer.class);
+        Authenticator authenticator = mock(Authenticator.class);
 
         Map<String, Object> configs = new HashMap<>();
         configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
OldPrincipalBuilder.class);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index cef7c7fae49..6cf75861122 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -29,7 +29,6 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.IMocksControl;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -53,15 +52,17 @@
 import java.util.Set;
 import java.util.Optional;
 
-import static org.easymock.EasyMock.createControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 /**
@@ -561,31 +562,20 @@ private Thread createSender(InetSocketAddress 
serverAddress, byte[] payload) {
      */
     @Test
     public void testConnectDisconnectDuringInSinglePoll() throws Exception {
-        IMocksControl control = createControl();
-
         // channel is connected, not ready and it throws an exception during 
prepare
-        KafkaChannel kafkaChannel = control.createMock(KafkaChannel.class);
-        expect(kafkaChannel.id()).andStubReturn("1");
-        expect(kafkaChannel.socketDescription()).andStubReturn("");
-        expect(kafkaChannel.state()).andStubReturn(ChannelState.NOT_CONNECTED);
-        expect(kafkaChannel.finishConnect()).andReturn(true);
-        expect(kafkaChannel.isConnected()).andStubReturn(true);
-        // record void method invocations
-        kafkaChannel.disconnect();
-        kafkaChannel.close();
-        expect(kafkaChannel.ready()).andReturn(false).anyTimes();
-        // prepare throws an exception
-        kafkaChannel.prepare();
-        expectLastCall().andThrow(new IOException());
-
-        SelectionKey selectionKey = control.createMock(SelectionKey.class);
-        expect(kafkaChannel.selectionKey()).andStubReturn(selectionKey);
-        expect(selectionKey.channel()).andReturn(SocketChannel.open());
-        expect(selectionKey.readyOps()).andStubReturn(SelectionKey.OP_CONNECT);
-        selectionKey.cancel();
-        expectLastCall();
-
-        control.replay();
+        KafkaChannel kafkaChannel = mock(KafkaChannel.class);
+        when(kafkaChannel.id()).thenReturn("1");
+        when(kafkaChannel.socketDescription()).thenReturn("");
+        when(kafkaChannel.state()).thenReturn(ChannelState.NOT_CONNECTED);
+        when(kafkaChannel.finishConnect()).thenReturn(true);
+        when(kafkaChannel.isConnected()).thenReturn(true);
+        when(kafkaChannel.ready()).thenReturn(false);
+        doThrow(new IOException()).when(kafkaChannel).prepare();
+
+        SelectionKey selectionKey = mock(SelectionKey.class);
+        when(kafkaChannel.selectionKey()).thenReturn(selectionKey);
+        when(selectionKey.channel()).thenReturn(SocketChannel.open());
+        when(selectionKey.readyOps()).thenReturn(SelectionKey.OP_CONNECT);
 
         selectionKey.attach(kafkaChannel);
         Set<SelectionKey> selectionKeys = Utils.mkSet(selectionKey);
@@ -595,7 +585,10 @@ public void testConnectDisconnectDuringInSinglePoll() 
throws Exception {
         assertTrue(selector.disconnected().containsKey(kafkaChannel.id()));
         assertNull(selectionKey.attachment());
 
-        control.verify();
+        verify(kafkaChannel, atLeastOnce()).ready();
+        verify(kafkaChannel).disconnect();
+        verify(kafkaChannel).close();
+        verify(selectionKey).cancel();
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 77de33fcea7..4b2b3618a4a 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -23,8 +23,6 @@
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockSupport;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -45,8 +43,14 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-public class FileRecordsTest extends EasyMockSupport {
+public class FileRecordsTest {
 
     private byte[][] values = new byte[][] {
             "abcd".getBytes(),
@@ -66,10 +70,7 @@ public void setup() throws IOException {
     public void testAppendProtectsFromOverflow() throws Exception {
         File fileMock = mock(File.class);
         FileChannel fileChannelMock = mock(FileChannel.class);
-        EasyMock.expect(fileChannelMock.size()).andStubReturn((long) 
Integer.MAX_VALUE);
-        
EasyMock.expect(fileChannelMock.position(Integer.MAX_VALUE)).andReturn(fileChannelMock);
-
-        replayAll();
+        when(fileChannelMock.size()).thenReturn((long) Integer.MAX_VALUE);
 
         FileRecords records = new FileRecords(fileMock, fileChannelMock, 0, 
Integer.MAX_VALUE, false);
         append(records, values);
@@ -79,9 +80,7 @@ public void testAppendProtectsFromOverflow() throws Exception 
{
     public void testOpenOversizeFile() throws Exception {
         File fileMock = mock(File.class);
         FileChannel fileChannelMock = mock(FileChannel.class);
-        
EasyMock.expect(fileChannelMock.size()).andStubReturn(Integer.MAX_VALUE + 5L);
-
-        replayAll();
+        when(fileChannelMock.size()).thenReturn(Integer.MAX_VALUE + 5L);
 
         new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, 
false);
     }
@@ -262,16 +261,16 @@ public void testTruncate() throws IOException {
      */
     @Test
     public void testTruncateNotCalledIfSizeIsSameAsTargetSize() throws 
IOException {
-        FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+        FileChannel channelMock = mock(FileChannel.class);
 
-        EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
-        EasyMock.expect(channelMock.position(42L)).andReturn(null);
-        EasyMock.replay(channelMock);
+        when(channelMock.size()).thenReturn(42L);
+        when(channelMock.position(42L)).thenReturn(null);
 
         FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, 
Integer.MAX_VALUE, false);
         fileRecords.truncateTo(42);
 
-        EasyMock.verify(channelMock);
+        verify(channelMock, atLeastOnce()).size();
+        verify(channelMock, times(0)).truncate(anyLong());
     }
 
     /**
@@ -280,11 +279,9 @@ public void 
testTruncateNotCalledIfSizeIsSameAsTargetSize() throws IOException {
      */
     @Test
     public void testTruncateNotCalledIfSizeIsBiggerThanTargetSize() throws 
IOException {
-        FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+        FileChannel channelMock = mock(FileChannel.class);
 
-        EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
-        EasyMock.expect(channelMock.position(42L)).andReturn(null);
-        EasyMock.replay(channelMock);
+        when(channelMock.size()).thenReturn(42L);
 
         FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, 
Integer.MAX_VALUE, false);
 
@@ -295,7 +292,7 @@ public void 
testTruncateNotCalledIfSizeIsBiggerThanTargetSize() throws IOExcepti
             // expected
         }
 
-        EasyMock.verify(channelMock);
+        verify(channelMock, atLeastOnce()).size();
     }
 
     /**
@@ -303,17 +300,16 @@ public void 
testTruncateNotCalledIfSizeIsBiggerThanTargetSize() throws IOExcepti
      */
     @Test
     public void testTruncateIfSizeIsDifferentToTargetSize() throws IOException 
{
-        FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+        FileChannel channelMock = mock(FileChannel.class);
 
-        EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
-        EasyMock.expect(channelMock.position(42L)).andReturn(null).once();
-        EasyMock.expect(channelMock.truncate(23L)).andReturn(null).once();
-        EasyMock.replay(channelMock);
+        when(channelMock.size()).thenReturn(42L);
+        when(channelMock.truncate(anyLong())).thenReturn(channelMock);
 
         FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, 
Integer.MAX_VALUE, false);
         fileRecords.truncateTo(23);
 
-        EasyMock.verify(channelMock);
+        verify(channelMock, atLeastOnce()).size();
+        verify(channelMock).truncate(23);
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
index 25bcd5008b2..a05a8502bf1 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
@@ -20,11 +20,8 @@
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.TransportLayer;
 import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
-import org.apache.kafka.common.security.kerberos.KerberosName;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.scram.internals.ScramMechanism;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockSupport;
 import org.junit.Test;
 
 import javax.net.ssl.SSLSession;
@@ -33,8 +30,13 @@
 import java.security.Principal;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
+public class DefaultKafkaPrincipalBuilderTest {
 
     @Test
     @SuppressWarnings("deprecation")
@@ -43,12 +45,7 @@ public void 
testUseOldPrincipalBuilderForPlaintextIfProvided() throws Exception
         Authenticator authenticator = mock(Authenticator.class);
         PrincipalBuilder oldPrincipalBuilder = mock(PrincipalBuilder.class);
 
-        EasyMock.expect(oldPrincipalBuilder.buildPrincipal(transportLayer, 
authenticator))
-                .andReturn(new DummyPrincipal("foo"));
-        oldPrincipalBuilder.close();
-        EasyMock.expectLastCall();
-
-        replayAll();
+        when(oldPrincipalBuilder.buildPrincipal(any(), any())).thenReturn(new 
DummyPrincipal("foo"));
 
         DefaultKafkaPrincipalBuilder builder = 
DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator,
                 transportLayer, oldPrincipalBuilder, null);
@@ -59,15 +56,17 @@ public void 
testUseOldPrincipalBuilderForPlaintextIfProvided() throws Exception
         assertEquals("foo", principal.getName());
 
         builder.close();
-        verifyAll();
+
+        verify(oldPrincipalBuilder).buildPrincipal(transportLayer, 
authenticator);
+        verify(oldPrincipalBuilder).close();
     }
 
     @Test
     public void testReturnAnonymousPrincipalForPlaintext() throws Exception {
-        DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(null);
-        assertEquals(KafkaPrincipal.ANONYMOUS, builder.build(
-                new PlaintextAuthenticationContext(InetAddress.getLocalHost(), 
SecurityProtocol.PLAINTEXT.name())));
-        builder.close();
+        try (DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(null)) {
+            assertEquals(KafkaPrincipal.ANONYMOUS, builder.build(
+                    new 
PlaintextAuthenticationContext(InetAddress.getLocalHost(), 
SecurityProtocol.PLAINTEXT.name())));
+        }
     }
 
     @Test
@@ -78,12 +77,8 @@ public void testUseOldPrincipalBuilderForSslIfProvided() 
throws Exception {
         PrincipalBuilder oldPrincipalBuilder = mock(PrincipalBuilder.class);
         SSLSession session = mock(SSLSession.class);
 
-        EasyMock.expect(oldPrincipalBuilder.buildPrincipal(transportLayer, 
authenticator))
-                .andReturn(new DummyPrincipal("foo"));
-        oldPrincipalBuilder.close();
-        EasyMock.expectLastCall();
-
-        replayAll();
+        when(oldPrincipalBuilder.buildPrincipal(any(), any()))
+                .thenReturn(new DummyPrincipal("foo"));
 
         DefaultKafkaPrincipalBuilder builder = 
DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator,
                 transportLayer, oldPrincipalBuilder, null);
@@ -94,16 +89,16 @@ public void testUseOldPrincipalBuilderForSslIfProvided() 
throws Exception {
         assertEquals("foo", principal.getName());
 
         builder.close();
-        verifyAll();
+
+        verify(oldPrincipalBuilder).buildPrincipal(transportLayer, 
authenticator);
+        verify(oldPrincipalBuilder).close();
     }
 
     @Test
     public void testUseSessionPeerPrincipalForSsl() throws Exception {
         SSLSession session = mock(SSLSession.class);
 
-        EasyMock.expect(session.getPeerPrincipal()).andReturn(new 
DummyPrincipal("foo"));
-
-        replayAll();
+        when(session.getPeerPrincipal()).thenReturn(new DummyPrincipal("foo"));
 
         DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(null);
 
@@ -113,17 +108,16 @@ public void testUseSessionPeerPrincipalForSsl() throws 
Exception {
         assertEquals("foo", principal.getName());
 
         builder.close();
-        verifyAll();
+
+        verify(session, atLeastOnce()).getPeerPrincipal();
     }
 
     @Test
     public void testPrincipalBuilderScram() throws Exception {
         SaslServer server = mock(SaslServer.class);
 
-        
EasyMock.expect(server.getMechanismName()).andReturn(ScramMechanism.SCRAM_SHA_256.mechanismName());
-        EasyMock.expect(server.getAuthorizationID()).andReturn("foo");
-
-        replayAll();
+        
when(server.getMechanismName()).thenReturn(ScramMechanism.SCRAM_SHA_256.mechanismName());
+        when(server.getAuthorizationID()).thenReturn("foo");
 
         DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(null);
 
@@ -133,7 +127,9 @@ public void testPrincipalBuilderScram() throws Exception {
         assertEquals("foo", principal.getName());
 
         builder.close();
-        verifyAll();
+
+        verify(server, atLeastOnce()).getMechanismName();
+        verify(server, atLeastOnce()).getAuthorizationID();
     }
 
     @Test
@@ -141,12 +137,9 @@ public void testPrincipalBuilderGssapi() throws Exception {
         SaslServer server = mock(SaslServer.class);
         KerberosShortNamer kerberosShortNamer = mock(KerberosShortNamer.class);
 
-        
EasyMock.expect(server.getMechanismName()).andReturn(SaslConfigs.GSSAPI_MECHANISM);
-        
EasyMock.expect(server.getAuthorizationID()).andReturn("foo/h...@realm.com");
-        
EasyMock.expect(kerberosShortNamer.shortName(EasyMock.anyObject(KerberosName.class)))
-                .andReturn("foo");
-
-        replayAll();
+        
when(server.getMechanismName()).thenReturn(SaslConfigs.GSSAPI_MECHANISM);
+        when(server.getAuthorizationID()).thenReturn("foo/h...@realm.com");
+        when(kerberosShortNamer.shortName(any())).thenReturn("foo");
 
         DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(kerberosShortNamer);
 
@@ -156,7 +149,10 @@ public void testPrincipalBuilderGssapi() throws Exception {
         assertEquals("foo", principal.getName());
 
         builder.close();
-        verifyAll();
+
+        verify(server, atLeastOnce()).getMechanismName();
+        verify(server, atLeastOnce()).getAuthorizationID();
+        verify(kerberosShortNamer, atLeastOnce()).shortName(any());
     }
 
     private static class DummyPrincipal implements Principal {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 1ae83eeace1..d62261ac363 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -28,9 +28,6 @@
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
 import org.junit.Test;
 
 import javax.security.auth.Subject;
@@ -42,33 +39,32 @@
 
 import static 
org.apache.kafka.common.security.scram.internals.ScramMechanism.SCRAM_SHA_256;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class SaslServerAuthenticatorTest {
 
     @Test(expected = InvalidReceiveException.class)
     public void testOversizeRequest() throws IOException {
-        TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
+        TransportLayer transportLayer = mock(TransportLayer.class);
         Map<String, ?> configs = 
Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
                 Collections.singletonList(SCRAM_SHA_256.mechanismName()));
         SaslServerAuthenticator authenticator = setupAuthenticator(configs, 
transportLayer, SCRAM_SHA_256.mechanismName());
 
-        final Capture<ByteBuffer> size = EasyMock.newCapture();
-        
EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new 
IAnswer<Integer>() {
-            @Override
-            public Integer answer() {
-                
size.getValue().putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE + 1);
-                return 4;
-            }
+        when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> {
+            
invocation.<ByteBuffer>getArgument(0).putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE
 + 1);
+            return 4;
         });
-
-        EasyMock.replay(transportLayer);
-
         authenticator.authenticate();
+        verify(transportLayer).read(any(ByteBuffer.class));
     }
 
     @Test
     public void testUnexpectedRequestType() throws IOException {
-        TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
+        TransportLayer transportLayer = mock(TransportLayer.class);
         Map<String, ?> configs = 
Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
                 Collections.singletonList(SCRAM_SHA_256.mechanismName()));
         SaslServerAuthenticator authenticator = setupAuthenticator(configs, 
transportLayer, SCRAM_SHA_256.mechanismName());
@@ -76,33 +72,23 @@ public void testUnexpectedRequestType() throws IOException {
         final RequestHeader header = new RequestHeader(ApiKeys.METADATA, 
(short) 0, "clientId", 13243);
         final Struct headerStruct = header.toStruct();
 
-        final Capture<ByteBuffer> size = EasyMock.newCapture();
-        
EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new 
IAnswer<Integer>() {
-            @Override
-            public Integer answer() {
-                size.getValue().putInt(headerStruct.sizeOf());
-                return 4;
-            }
-        });
-
-        final Capture<ByteBuffer> payload = EasyMock.newCapture();
-        
EasyMock.expect(transportLayer.read(EasyMock.capture(payload))).andAnswer(new 
IAnswer<Integer>() {
-            @Override
-            public Integer answer() {
-                // serialize only the request header. the authenticator should 
not parse beyond this
-                headerStruct.writeTo(payload.getValue());
-                return headerStruct.sizeOf();
-            }
+        when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> {
+            
invocation.<ByteBuffer>getArgument(0).putInt(headerStruct.sizeOf());
+            return 4;
+        }).then(invocation -> {
+            // serialize only the request header. the authenticator should not 
parse beyond this
+            headerStruct.writeTo(invocation.getArgument(0));
+            return headerStruct.sizeOf();
         });
 
-        EasyMock.replay(transportLayer);
-
         try {
             authenticator.authenticate();
             fail("Expected authenticate() to raise an exception");
         } catch (IllegalSaslStateException e) {
             // expected exception
         }
+
+        verify(transportLayer, times(2)).read(any(ByteBuffer.class));
     }
 
     private SaslServerAuthenticator setupAuthenticator(Map<String, ?> configs, 
TransportLayer transportLayer, String mechanism) throws IOException {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
index 51d60123c7f..2ecde99b704 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
@@ -21,6 +21,8 @@
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyZeroInteractions;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -40,7 +42,6 @@
 import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
 import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
 import org.apache.kafka.common.security.auth.SaslExtensions;
-import org.easymock.EasyMock;
 import org.junit.Test;
 
 public class OAuthBearerLoginModuleTest {
@@ -124,12 +125,10 @@ public void 
login1Commit1Login2Commit2Logout1Login3Commit3Logout2() throws Login
         Set<Object> publicCredentials = subject.getPublicCredentials();
 
         // Create callback handler
-        OAuthBearerToken[] tokens = new OAuthBearerToken[] 
{EasyMock.mock(OAuthBearerToken.class),
-            EasyMock.mock(OAuthBearerToken.class), 
EasyMock.mock(OAuthBearerToken.class)};
-        SaslExtensions[] extensions = new SaslExtensions[] 
{EasyMock.mock(SaslExtensions.class),
-            EasyMock.mock(SaslExtensions.class), 
EasyMock.mock(SaslExtensions.class)};
-        EasyMock.replay(tokens[0], tokens[1], tokens[2]); // expect nothing
-        EasyMock.replay(extensions[0], extensions[2]);
+        OAuthBearerToken[] tokens = new OAuthBearerToken[] 
{mock(OAuthBearerToken.class),
+            mock(OAuthBearerToken.class), mock(OAuthBearerToken.class)};
+        SaslExtensions[] extensions = new SaslExtensions[] 
{mock(SaslExtensions.class),
+            mock(SaslExtensions.class), mock(SaslExtensions.class)};
         TestCallbackHandler testTokenCallbackHandler = new 
TestCallbackHandler(tokens, extensions);
 
         // Create login modules
@@ -207,6 +206,9 @@ public void 
login1Commit1Login2Commit2Logout1Login3Commit3Logout2() throws Login
         assertEquals(1, publicCredentials.size());
         assertSame(tokens[2], privateCredentials.iterator().next());
         assertSame(extensions[2], publicCredentials.iterator().next());
+
+        verifyZeroInteractions((Object[]) tokens);
+        verifyZeroInteractions((Object[]) extensions);
     }
 
     @Test
@@ -220,12 +222,10 @@ public void login1Commit1Logout1Login2Commit2Logout2() 
throws LoginException {
         Set<Object> publicCredentials = subject.getPublicCredentials();
 
         // Create callback handler
-        OAuthBearerToken[] tokens = new OAuthBearerToken[] 
{EasyMock.mock(OAuthBearerToken.class),
-            EasyMock.mock(OAuthBearerToken.class)};
-        SaslExtensions[] extensions = new SaslExtensions[] 
{EasyMock.mock(SaslExtensions.class),
-            EasyMock.mock(SaslExtensions.class)};
-        EasyMock.replay(tokens[0], tokens[1]); // expect nothing
-        EasyMock.replay(extensions[0], extensions[1]);
+        OAuthBearerToken[] tokens = new OAuthBearerToken[] 
{mock(OAuthBearerToken.class),
+            mock(OAuthBearerToken.class)};
+        SaslExtensions[] extensions = new SaslExtensions[] 
{mock(SaslExtensions.class),
+            mock(SaslExtensions.class)};
         TestCallbackHandler testTokenCallbackHandler = new 
TestCallbackHandler(tokens, extensions);
 
         // Create login modules
@@ -268,6 +268,9 @@ public void login1Commit1Logout1Login2Commit2Logout2() 
throws LoginException {
         // Should have nothing again
         assertEquals(0, privateCredentials.size());
         assertEquals(0, publicCredentials.size());
+
+        verifyZeroInteractions((Object[]) tokens);
+        verifyZeroInteractions((Object[]) extensions);
     }
 
     @Test
@@ -280,12 +283,10 @@ public void loginAbortLoginCommitLogout() throws 
LoginException {
         Set<Object> publicCredentials = subject.getPublicCredentials();
 
         // Create callback handler
-        OAuthBearerToken[] tokens = new OAuthBearerToken[] 
{EasyMock.mock(OAuthBearerToken.class),
-            EasyMock.mock(OAuthBearerToken.class)};
-        SaslExtensions[] extensions = new SaslExtensions[] 
{EasyMock.mock(SaslExtensions.class),
-            EasyMock.mock(SaslExtensions.class)};
-        EasyMock.replay(tokens[0], tokens[1]); // expect nothing
-        EasyMock.replay(extensions[0], extensions[1]);
+        OAuthBearerToken[] tokens = new OAuthBearerToken[] 
{mock(OAuthBearerToken.class),
+            mock(OAuthBearerToken.class)};
+        SaslExtensions[] extensions = new SaslExtensions[] 
{mock(SaslExtensions.class),
+            mock(SaslExtensions.class)};
         TestCallbackHandler testTokenCallbackHandler = new 
TestCallbackHandler(tokens, extensions);
 
         // Create login module
@@ -319,6 +320,9 @@ public void loginAbortLoginCommitLogout() throws 
LoginException {
         // Should have nothing again
         assertEquals(0, privateCredentials.size());
         assertEquals(0, publicCredentials.size());
+
+        verifyZeroInteractions((Object[]) tokens);
+        verifyZeroInteractions((Object[]) extensions);
     }
 
     @Test
@@ -332,12 +336,10 @@ public void 
login1Commit1Login2Abort2Login3Commit3Logout3() throws LoginExceptio
         Set<Object> publicCredentials = subject.getPublicCredentials();
 
         // Create callback handler
-        OAuthBearerToken[] tokens = new OAuthBearerToken[] 
{EasyMock.mock(OAuthBearerToken.class),
-            EasyMock.mock(OAuthBearerToken.class), 
EasyMock.mock(OAuthBearerToken.class)};
-        SaslExtensions[] extensions = new SaslExtensions[] 
{EasyMock.mock(SaslExtensions.class),
-            EasyMock.mock(SaslExtensions.class), 
EasyMock.mock(SaslExtensions.class)};
-        EasyMock.replay(tokens[0], tokens[1], tokens[2]); // expect nothing
-        EasyMock.replay(extensions[0], extensions[1], extensions[2]);
+        OAuthBearerToken[] tokens = new OAuthBearerToken[] 
{mock(OAuthBearerToken.class),
+            mock(OAuthBearerToken.class), mock(OAuthBearerToken.class)};
+        SaslExtensions[] extensions = new SaslExtensions[] 
{mock(SaslExtensions.class),
+            mock(SaslExtensions.class), mock(SaslExtensions.class)};
         TestCallbackHandler testTokenCallbackHandler = new 
TestCallbackHandler(tokens, extensions);
 
         // Create login modules
@@ -402,6 +404,9 @@ public void login1Commit1Login2Abort2Login3Commit3Logout3() 
throws LoginExceptio
         assertSame(tokens[2], privateCredentials.iterator().next());
         assertEquals(1, publicCredentials.size());
         assertSame(extensions[2], publicCredentials.iterator().next());
+
+        verifyZeroInteractions((Object[]) tokens);
+        verifyZeroInteractions((Object[]) extensions);
     }
 
     /**
@@ -413,9 +418,8 @@ public void 
commitDoesNotThrowOnUnsupportedExtensionsCallback() throws LoginExce
         Subject subject = new Subject();
 
         // Create callback handler
-        OAuthBearerToken[] tokens = new OAuthBearerToken[] 
{EasyMock.mock(OAuthBearerToken.class),
-                EasyMock.mock(OAuthBearerToken.class), 
EasyMock.mock(OAuthBearerToken.class)};
-        EasyMock.replay(tokens[0], tokens[1], tokens[2]); // expect nothing
+        OAuthBearerToken[] tokens = new OAuthBearerToken[] 
{mock(OAuthBearerToken.class),
+                mock(OAuthBearerToken.class), mock(OAuthBearerToken.class)};
         TestCallbackHandler testTokenCallbackHandler = new 
TestCallbackHandler(tokens, new SaslExtensions[] 
{RAISE_UNSUPPORTED_CB_EXCEPTION_FLAG});
 
         // Create login modules
@@ -429,5 +433,7 @@ public void 
commitDoesNotThrowOnUnsupportedExtensionsCallback() throws LoginExce
         SaslExtensions extensions = 
subject.getPublicCredentials(SaslExtensions.class).iterator().next();
         assertNotNull(extensions);
         assertTrue(extensions.map().isEmpty());
+
+        verifyZeroInteractions((Object[]) tokens);
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java
index fad743136f3..6d23f620599 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java
@@ -22,7 +22,6 @@
 import org.apache.kafka.common.security.auth.SaslExtensions;
 import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
 import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
-import org.easymock.EasyMockSupport;
 import org.junit.Test;
 
 import javax.security.auth.callback.Callback;
@@ -39,7 +38,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-public class OAuthBearerSaslClientTest extends EasyMockSupport {
+public class OAuthBearerSaslClientTest {
 
     private static final Map<String, String> TEST_PROPERTIES = new 
LinkedHashMap<String, String>() {
         {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
index 9c62bef427a..cc0b9835073 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
@@ -19,6 +19,9 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -42,8 +45,8 @@
 import org.apache.kafka.common.utils.MockScheduler;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import org.easymock.EasyMock;
 import org.junit.Test;
+import org.mockito.InOrder;
 
 public class ExpiringCredentialRefreshingLoginTest {
     private static final Configuration EMPTY_WILDCARD_CONFIGURATION;
@@ -257,24 +260,8 @@ public void testRefresh() throws Exception {
         for (int numExpectedRefreshes : new int[] {0, 1, 2}) {
             for (boolean clientReloginAllowedBeforeLogout : new boolean[] 
{true, false}) {
                 Subject subject = new Subject();
-                /*
-                 * Create a mock and record the fact that we expect login() to 
be invoked
-                 * followed by getSubject() and then ultimately followed by 
numExpectedRefreshes
-                 * pairs of either login()/logout() or logout()/login() calls
-                 */
-                final LoginContext mockLoginContext = 
EasyMock.strictMock(LoginContext.class);
-                mockLoginContext.login();
-                
EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject);
-                for (int i = 0; i < numExpectedRefreshes; ++i) {
-                    if (clientReloginAllowedBeforeLogout) {
-                        mockLoginContext.login();
-                        mockLoginContext.logout();
-                    } else {
-                        mockLoginContext.logout();
-                        mockLoginContext.login();
-                    }
-                }
-                EasyMock.replay(mockLoginContext);
+                final LoginContext mockLoginContext = mock(LoginContext.class);
+                when(mockLoginContext.getSubject()).thenReturn(subject);
 
                 MockTime mockTime = new MockTime();
                 long startMs = mockTime.milliseconds();
@@ -335,6 +322,23 @@ public void testRefresh() throws Exception {
                     assertEquals((i + 1) * 1000 * 60 * refreshEveryMinutes, 
waiter.get().longValue() - startMs);
                 }
                 assertFalse(waiters.get(numExpectedRefreshes).isDone());
+
+                /*
+                 * We expect login() to be invoked followed by getSubject() 
and then ultimately followed by
+                 * numExpectedRefreshes pairs of either login()/logout() or 
logout()/login() calls
+                 */
+                InOrder inOrder = inOrder(mockLoginContext);
+                inOrder.verify(mockLoginContext).login();
+                inOrder.verify(mockLoginContext).getSubject();
+                for (int i = 0; i < numExpectedRefreshes; ++i) {
+                    if (clientReloginAllowedBeforeLogout) {
+                        inOrder.verify(mockLoginContext).login();
+                        inOrder.verify(mockLoginContext).logout();
+                    } else {
+                        inOrder.verify(mockLoginContext).logout();
+                        inOrder.verify(mockLoginContext).login();
+                    }
+                }
             }
         }
     }
@@ -343,20 +347,9 @@ public void testRefresh() throws Exception {
     public void testRefreshWithExpirationSmallerThanConfiguredBuffers() throws 
Exception {
         int numExpectedRefreshes = 1;
         boolean clientReloginAllowedBeforeLogout = true;
+        final LoginContext mockLoginContext = mock(LoginContext.class);
         Subject subject = new Subject();
-        /*
-         * Create a mock and record the fact that we expect login() to be 
invoked
-         * followed by getSubject() and then ultimately followed by 
numExpectedRefreshes
-         * pairs of login()/logout() calls
-         */
-        final LoginContext mockLoginContext = 
EasyMock.strictMock(LoginContext.class);
-        mockLoginContext.login();
-        EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject);
-        for (int i = 0; i < numExpectedRefreshes; ++i) {
-            mockLoginContext.login();
-            mockLoginContext.logout();
-        }
-        EasyMock.replay(mockLoginContext);
+        when(mockLoginContext.getSubject()).thenReturn(subject);
 
         MockTime mockTime = new MockTime();
         long startMs = mockTime.milliseconds();
@@ -419,6 +412,13 @@ public void 
testRefreshWithExpirationSmallerThanConfiguredBuffers() throws Excep
             assertEquals((i + 1) * 1000 * 60 * refreshEveryMinutes, 
waiter.get().longValue() - startMs);
         }
         assertFalse(waiters.get(numExpectedRefreshes).isDone());
+
+        InOrder inOrder = inOrder(mockLoginContext);
+        inOrder.verify(mockLoginContext).login();
+        for (int i = 0; i < numExpectedRefreshes; ++i) {
+            inOrder.verify(mockLoginContext).login();
+            inOrder.verify(mockLoginContext).logout();
+        }
     }
 
     @Test
@@ -426,19 +426,8 @@ public void testRefreshWithMinPeriodIntrusion() throws 
Exception {
         int numExpectedRefreshes = 1;
         boolean clientReloginAllowedBeforeLogout = true;
         Subject subject = new Subject();
-        /*
-         * Create a mock and record the fact that we expect login() to be 
invoked
-         * followed by getSubject() and then ultimately followed by 
numExpectedRefreshes
-         * pairs of login()/logout() calls
-         */
-        final LoginContext mockLoginContext = 
EasyMock.strictMock(LoginContext.class);
-        mockLoginContext.login();
-        EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject);
-        for (int i = 0; i < numExpectedRefreshes; ++i) {
-            mockLoginContext.login();
-            mockLoginContext.logout();
-        }
-        EasyMock.replay(mockLoginContext);
+        final LoginContext mockLoginContext = mock(LoginContext.class);
+        when(mockLoginContext.getSubject()).thenReturn(subject);
 
         MockTime mockTime = new MockTime();
         long startMs = mockTime.milliseconds();
@@ -504,6 +493,13 @@ public void testRefreshWithMinPeriodIntrusion() throws 
Exception {
                     waiter.get().longValue() - startMs);
         }
         assertFalse(waiters.get(numExpectedRefreshes).isDone());
+
+        InOrder inOrder = inOrder(mockLoginContext);
+        inOrder.verify(mockLoginContext).login();
+        for (int i = 0; i < numExpectedRefreshes; ++i) {
+            inOrder.verify(mockLoginContext).login();
+            inOrder.verify(mockLoginContext).logout();
+        }
     }
 
     @Test
@@ -511,19 +507,8 @@ public void testRefreshWithPreExpirationBufferIntrusion() 
throws Exception {
         int numExpectedRefreshes = 1;
         boolean clientReloginAllowedBeforeLogout = true;
         Subject subject = new Subject();
-        /*
-         * Create a mock and record the fact that we expect login() to be 
invoked
-         * followed by getSubject() and then ultimately followed by 
numExpectedRefreshes
-         * pairs of login()/logout() calls
-         */
-        final LoginContext mockLoginContext = 
EasyMock.strictMock(LoginContext.class);
-        mockLoginContext.login();
-        EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject);
-        for (int i = 0; i < numExpectedRefreshes; ++i) {
-            mockLoginContext.login();
-            mockLoginContext.logout();
-        }
-        EasyMock.replay(mockLoginContext);
+        final LoginContext mockLoginContext = mock(LoginContext.class);
+        when(mockLoginContext.getSubject()).thenReturn(subject);
 
         MockTime mockTime = new MockTime();
         long startMs = mockTime.milliseconds();
@@ -588,6 +573,13 @@ public void testRefreshWithPreExpirationBufferIntrusion() 
throws Exception {
                     waiter.get().longValue() - startMs);
         }
         assertFalse(waiters.get(numExpectedRefreshes).isDone());
+
+        InOrder inOrder = inOrder(mockLoginContext);
+        inOrder.verify(mockLoginContext).login();
+        for (int i = 0; i < numExpectedRefreshes; ++i) {
+            inOrder.verify(mockLoginContext).login();
+            inOrder.verify(mockLoginContext).logout();
+        }
     }
 
     private static List<KafkaFutureImpl<Long>> addWaiters(MockScheduler 
mockScheduler, long refreshEveryMillis,
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index b258a342164..d5029b6ed2f 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -17,9 +17,8 @@
 package org.apache.kafka.common.utils;
 
 import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
 import org.junit.Test;
+import org.mockito.stubbing.OngoingStubbing;
 
 import java.io.Closeable;
 import java.io.DataOutputStream;
@@ -34,6 +33,8 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.kafka.common.utils.Utils.formatAddress;
 import static org.apache.kafka.common.utils.Utils.formatBytes;
@@ -45,6 +46,12 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class UtilsTest {
 
@@ -324,17 +331,15 @@ public void testReadFullyOrFailWithRealFile() throws 
IOException {
      */
     @Test
     public void testReadFullyOrFailWithPartialFileChannelReads() throws 
IOException {
-        FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+        FileChannel channelMock = mock(FileChannel.class);
         final int bufferSize = 100;
         ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
-        StringBuilder expectedBufferContent = new StringBuilder();
-        fileChannelMockExpectReadWithRandomBytes(channelMock, 
expectedBufferContent, bufferSize);
-        EasyMock.replay(channelMock);
+        String expectedBufferContent = 
fileChannelMockExpectReadWithRandomBytes(channelMock, bufferSize);
         Utils.readFullyOrFail(channelMock, buffer, 0L, "test");
-        assertEquals("The buffer should be populated correctly", 
expectedBufferContent.toString(),
+        assertEquals("The buffer should be populated correctly", 
expectedBufferContent,
                 new String(buffer.array()));
         assertFalse("The buffer should be filled", buffer.hasRemaining());
-        EasyMock.verify(channelMock);
+        verify(channelMock, atLeastOnce()).read(any(), anyLong());
     }
 
     /**
@@ -343,73 +348,62 @@ public void 
testReadFullyOrFailWithPartialFileChannelReads() throws IOException
      */
     @Test
     public void testReadFullyWithPartialFileChannelReads() throws IOException {
-        FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+        FileChannel channelMock = mock(FileChannel.class);
         final int bufferSize = 100;
-        StringBuilder expectedBufferContent = new StringBuilder();
-        fileChannelMockExpectReadWithRandomBytes(channelMock, 
expectedBufferContent, bufferSize);
-        EasyMock.replay(channelMock);
+        String expectedBufferContent = 
fileChannelMockExpectReadWithRandomBytes(channelMock, bufferSize);
         ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
         Utils.readFully(channelMock, buffer, 0L);
-        assertEquals("The buffer should be populated correctly.", 
expectedBufferContent.toString(),
+        assertEquals("The buffer should be populated correctly.", 
expectedBufferContent,
                 new String(buffer.array()));
         assertFalse("The buffer should be filled", buffer.hasRemaining());
-        EasyMock.verify(channelMock);
+        verify(channelMock, atLeastOnce()).read(any(), anyLong());
     }
 
     @Test
     public void testReadFullyIfEofIsReached() throws IOException {
-        final FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+        final FileChannel channelMock = mock(FileChannel.class);
         final int bufferSize = 100;
         final String fileChannelContent = "abcdefghkl";
         ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
-        EasyMock.expect(channelMock.size()).andReturn((long) 
fileChannelContent.length());
-        EasyMock.expect(channelMock.read(EasyMock.anyObject(ByteBuffer.class), 
EasyMock.anyInt())).andAnswer(new IAnswer<Integer>() {
-            @Override
-            public Integer answer() {
-                ByteBuffer buffer = (ByteBuffer) 
EasyMock.getCurrentArguments()[0];
-                buffer.put(fileChannelContent.getBytes());
-                return -1;
-            }
+        when(channelMock.read(any(), anyLong())).then(invocation -> {
+            ByteBuffer bufferArg = invocation.getArgument(0);
+            bufferArg.put(fileChannelContent.getBytes());
+            return -1;
         });
-        EasyMock.replay(channelMock);
         Utils.readFully(channelMock, buffer, 0L);
         assertEquals("abcdefghkl", new String(buffer.array(), 0, 
buffer.position()));
-        assertEquals(buffer.position(), channelMock.size());
+        assertEquals(fileChannelContent.length(), buffer.position());
         assertTrue(buffer.hasRemaining());
-        EasyMock.verify(channelMock);
+        verify(channelMock, atLeastOnce()).read(any(), anyLong());
     }
 
     /**
      * Expectation setter for multiple reads where each one reads random bytes 
to the buffer.
      *
      * @param channelMock           The mocked FileChannel object
-     * @param expectedBufferContent buffer that will be updated to contain the 
expected buffer content after each
-     *                              `FileChannel.read` invocation
      * @param bufferSize            The buffer size
+     * @return                      Expected buffer string
      * @throws IOException          If an I/O error occurs
      */
-    private void fileChannelMockExpectReadWithRandomBytes(final FileChannel 
channelMock,
-                                                          final StringBuilder 
expectedBufferContent,
-                                                          final int 
bufferSize) throws IOException {
+    private String fileChannelMockExpectReadWithRandomBytes(final FileChannel 
channelMock,
+                                                            final int 
bufferSize) throws IOException {
         final int step = 20;
         final Random random = new Random();
         int remainingBytes = bufferSize;
+        OngoingStubbing<Integer> when = when(channelMock.read(any(), 
anyLong()));
+        StringBuilder expectedBufferContent = new StringBuilder();
         while (remainingBytes > 0) {
-            final int mockedBytesRead = remainingBytes < step ? remainingBytes 
: random.nextInt(step);
-            final StringBuilder sb = new StringBuilder();
-            
EasyMock.expect(channelMock.read(EasyMock.anyObject(ByteBuffer.class), 
EasyMock.anyInt())).andAnswer(new IAnswer<Integer>() {
-                @Override
-                public Integer answer() {
-                    ByteBuffer buffer = (ByteBuffer) 
EasyMock.getCurrentArguments()[0];
-                    for (int i = 0; i < mockedBytesRead; i++)
-                        sb.append("a");
-                    buffer.put(sb.toString().getBytes());
-                    expectedBufferContent.append(sb);
-                    return mockedBytesRead;
-                }
+            final int bytesRead = remainingBytes < step ? remainingBytes : 
random.nextInt(step);
+            final String stringRead = IntStream.range(0, bytesRead).mapToObj(i 
-> "a").collect(Collectors.joining());
+            expectedBufferContent.append(stringRead);
+            when = when.then(invocation -> {
+                ByteBuffer buffer = invocation.getArgument(0);
+                buffer.put(stringRead.getBytes());
+                return bytesRead;
             });
-            remainingBytes -= mockedBytesRead;
+            remainingBytes -= bytesRead;
         }
+        return expectedBufferContent.toString();
     }
 
     private static class TestCloseable implements Closeable {
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 23fc68aaba5..9ebf9e2261d 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -74,6 +74,7 @@ versions += [
   lz4: "1.5.0",
   mavenArtifact: "3.5.4",
   metrics: "2.2.0",
+  mockito: "2.23.0",
   // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
   powermock: "2.0.0-beta.5",
   reflections: "0.9.11",
@@ -125,6 +126,7 @@ libs += [
   log4j: "log4j:log4j:$versions.log4j",
   lz4: "org.lz4:lz4-java:$versions.lz4",
   metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
+  mockitoCore: "org.mockito:mockito-core:$versions.mockito",
   powermockJunit4: "org.powermock:powermock-module-junit4:$versions.powermock",
   powermockEasymock: 
"org.powermock:powermock-api-easymock:$versions.powermock",
   reflections: "org.reflections:reflections:$versions.reflections",


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Replace EasyMock and PowerMock with Mockito in clients module
> -------------------------------------------------------------
>
>                 Key: KAFKA-7439
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7439
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Ismael Juma
>            Assignee: Ismael Juma
>            Priority: Major
>
> See parent task for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to