[ https://issues.apache.org/jira/browse/KAFKA-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644201#comment-16644201 ]
ASF GitHub Bot commented on KAFKA-7439: --------------------------------------- lindong28 closed pull request #5691: KAFKA-7439: Replace EasyMock and PowerMock with Mockito in clients module URL: https://github.com/apache/kafka/pull/5691 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build.gradle b/build.gradle index 47fa18620fd..d230c115a83 100644 --- a/build.gradle +++ b/build.gradle @@ -829,9 +829,7 @@ project(':clients') { testCompile libs.bcpkix testCompile libs.junit - testCompile libs.easymock - testCompile libs.powermockJunit4 - testCompile libs.powermockEasymock + testCompile libs.mockitoCore testRuntime libs.slf4jlog4j testRuntime libs.jacksonDatabind diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index bd5c11fc579..7810a3e8673 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -28,6 +28,7 @@ <allow pkg="org.slf4j" /> <allow pkg="org.junit" /> <allow pkg="org.hamcrest" /> + <allow pkg="org.mockito" /> <allow pkg="org.easymock" /> <allow pkg="org.powermock" /> <allow pkg="java.security" /> diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 1413eacacce..0abb5c45ba6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -50,7 +50,7 @@ * is removed from the metadata refresh set after an update. Consumers disable topic expiry since they explicitly * manage topics while producers rely on topic expiry to limit the refresh set. */ -public final class Metadata implements Closeable { +public class Metadata implements Closeable { private static final Logger log = LoggerFactory.getLogger(Metadata.class); diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index e4ba19779e6..b2098bfa065 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -595,6 +595,8 @@ public void close() { @Override public Node leastLoadedNode(long now) { List<Node> nodes = this.metadataUpdater.fetchNodes(); + if (nodes.isEmpty()) + throw new IllegalStateException("There are no nodes in the Kafka cluster"); int inflight = Integer.MAX_VALUE; Node found = null; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index e249c12d5cc..316b024e996 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -252,7 +252,6 @@ private final Serializer<V> valueSerializer; private final ProducerConfig producerConfig; private final long maxBlockTimeMs; - private final int requestTimeoutMs; private final ProducerInterceptors<K, V> interceptors; private final ApiVersions apiVersions; private final TransactionManager transactionManager; @@ -269,7 +268,7 @@ * */ public KafkaProducer(final Map<String, Object> configs) { - this(new ProducerConfig(configs), null, null, null, null); + this(new ProducerConfig(configs), null, null, null, null, null, Time.SYSTEM); } /** @@ -287,10 +286,7 @@ public KafkaProducer(final Map<String, Object> configs) { */ public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) { this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)), - keySerializer, - valueSerializer, - null, - null); + keySerializer, valueSerializer, null, null, null, Time.SYSTEM); } /** @@ -301,7 +297,7 @@ public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, S * @param properties The producer configs */ public KafkaProducer(Properties properties) { - this(new ProducerConfig(properties), null, null, null, null); + this(new ProducerConfig(properties), null, null, null, null, null, Time.SYSTEM); } /** @@ -317,20 +313,22 @@ public KafkaProducer(Properties properties) { */ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) { this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)), - keySerializer, valueSerializer, null, null); + keySerializer, valueSerializer, null, null, null, Time.SYSTEM); } - @SuppressWarnings("unchecked") // visible for testing + @SuppressWarnings("unchecked") KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer, Metadata metadata, - KafkaClient kafkaClient) { + KafkaClient kafkaClient, + ProducerInterceptors interceptors, + Time time) { try { Map<String, Object> userProvidedConfigs = config.originals(); this.producerConfig = config; - this.time = Time.SYSTEM; + this.time = time; String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); @@ -356,7 +354,6 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); - ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics); this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); if (keySerializer == null) { @@ -378,20 +375,21 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali // load interceptors and make sure they get clientId userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); - List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, - ProducerInterceptor.class); - this.interceptors = new ProducerInterceptors<>(interceptorList); - ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters); + ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false); + List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); + if (interceptors != null) + this.interceptors = interceptors; + else + this.interceptors = new ProducerInterceptors<>(interceptorList); + ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, + valueSerializer, interceptorList, reporters); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); - this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.transactionManager = configureTransactionState(config, logContext, log); - int retries = configureRetries(config, transactionManager != null, log); - int maxInflightRequests = configureInflightRequests(config, transactionManager != null); - short acks = configureAcks(config, transactionManager != null, log); int deliveryTimeoutMs = configureDeliveryTimeout(config, log); this.apiVersions = new ApiVersions(); @@ -413,44 +411,13 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali } else { this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, true, clusterResourceListeners); - this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds()); + this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), time.milliseconds()); } - ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); - Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics); - KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient( - new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), - this.metrics, time, "producer", channelBuilder, logContext), - this.metadata, - clientId, - maxInflightRequests, - config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), - config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), - config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), - config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), - this.requestTimeoutMs, - time, - true, - apiVersions, - throttleTimeSensor, - logContext); - this.sender = new Sender(logContext, - client, - this.metadata, - this.accumulator, - maxInflightRequests == 1, - config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), - acks, - retries, - metricsRegistry.senderMetrics, - Time.SYSTEM, - this.requestTimeoutMs, - config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), - this.transactionManager, - apiVersions); + this.errors = this.metrics.sensor("errors"); + this.sender = newSender(logContext, kafkaClient, this.metadata); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start(); - this.errors = this.metrics.sensor("errors"); config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); log.debug("Kafka producer started"); @@ -462,6 +429,47 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali } } + // visible for testing + Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) { + int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null); + int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig); + ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics); + Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics); + KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient( + new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), + this.metrics, time, "producer", channelBuilder, logContext), + metadata, + clientId, + maxInflightRequests, + producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), + producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), + producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG), + producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), + requestTimeoutMs, + time, + true, + apiVersions, + throttleTimeSensor, + logContext); + int retries = configureRetries(producerConfig, transactionManager != null, log); + short acks = configureAcks(producerConfig, transactionManager != null, log); + return new Sender(logContext, + client, + metadata, + this.accumulator, + maxInflightRequests == 1, + producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), + acks, + retries, + metricsRegistry.senderMetrics, + time, + requestTimeoutMs, + producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), + this.transactionManager, + apiVersions); + } + private static int configureDeliveryTimeout(ProducerConfig config, Logger log) { int deliveryTimeoutMs = config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG); int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index c5df2dacbca..22d747265d6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -135,7 +135,7 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); - this.waitTime.record(timeNs, time.milliseconds()); + recordWaitTime(timeNs); } if (waitingTimeElapsed) { @@ -185,6 +185,11 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx return buffer; } + // Protected for testing + protected void recordWaitTime(long timeNs) { + this.waitTime.record(timeNs, time.milliseconds()); + } + /** * Allocate a buffer. If buffer allocation fails (e.g. because of OOM) then return the size count back to * available memory and signal the next waiter if it exists. diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 7640377ac17..c50a85f06da 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -40,8 +40,6 @@ import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.metrics.Measurable; -import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; @@ -149,7 +147,7 @@ public Sender(LogContext logContext, this.acks = acks; this.retries = retries; this.time = time; - this.sensors = new SenderMetrics(metricsRegistry); + this.sensors = new SenderMetrics(metricsRegistry, metadata, client, time); this.requestTimeoutMs = requestTimeoutMs; this.retryBackoffMs = retryBackoffMs; this.apiVersions = apiVersions; @@ -811,7 +809,7 @@ public static Sensor throttleTimeSensor(SenderMetricsRegistry metrics) { /** * A collection of sensors for the sender */ - private class SenderMetrics { + private static class SenderMetrics { public final Sensor retrySensor; public final Sensor errorSensor; public final Sensor queueTimeSensor; @@ -822,9 +820,11 @@ public static Sensor throttleTimeSensor(SenderMetricsRegistry metrics) { public final Sensor maxRecordSizeSensor; public final Sensor batchSplitSensor; private final SenderMetricsRegistry metrics; + private final Time time; - public SenderMetrics(SenderMetricsRegistry metrics) { + public SenderMetrics(SenderMetricsRegistry metrics, Metadata metadata, KafkaClient client, Time time) { this.metrics = metrics; + this.time = time; this.batchSizeSensor = metrics.sensor("batch-size"); this.batchSizeSensor.add(metrics.batchSizeAvg, new Avg()); @@ -855,16 +855,9 @@ public SenderMetrics(SenderMetricsRegistry metrics) { this.maxRecordSizeSensor.add(metrics.recordSizeMax, new Max()); this.maxRecordSizeSensor.add(metrics.recordSizeAvg, new Avg()); - this.metrics.addMetric(metrics.requestsInFlight, new Measurable() { - public double measure(MetricConfig config, long now) { - return client.inFlightRequestCount(); - } - }); - metrics.addMetric(metrics.metadataAge, new Measurable() { - public double measure(MetricConfig config, long now) { - return (now - metadata.lastSuccessfulUpdate()) / 1000.0; - } - }); + this.metrics.addMetric(metrics.requestsInFlight, (config, now) -> client.inFlightRequestCount()); + this.metrics.addMetric(metrics.metadataAge, + (config, now) -> (now - metadata.lastSuccessfulUpdate()) / 1000.0); this.batchSplitSensor = metrics.sensor("batch-split-rate"); this.batchSplitSensor.add(new Meter(metrics.batchSplitRate, metrics.batchSplitTotal)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 2a3cbe0b72d..4ac5876c5f0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -74,7 +74,6 @@ import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; -import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -115,6 +114,11 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; public class KafkaConsumerTest { private final String topic = "test"; @@ -1846,18 +1850,16 @@ private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, } @Test + @SuppressWarnings("deprecation") public void testCloseWithTimeUnit() { - KafkaConsumer consumer = EasyMock.partialMockBuilder(KafkaConsumer.class) - .addMockedMethod("close", Duration.class).createMock(); - consumer.close(Duration.ofSeconds(1)); - EasyMock.expectLastCall(); - EasyMock.replay(consumer); + KafkaConsumer consumer = mock(KafkaConsumer.class); + doCallRealMethod().when(consumer).close(anyLong(), any()); consumer.close(1, TimeUnit.SECONDS); - EasyMock.verify(consumer); + verify(consumer).close(Duration.ofSeconds(1)); } @Test(expected = InvalidTopicException.class) - public void testSubscriptionOnInvalidTopic() throws Exception { + public void testSubscriptionOnInvalidTopic() { Time time = new MockTime(); Cluster cluster = TestUtils.singletonCluster(); Node node = cluster.nodes().get(0); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index 4494fd5bc02..8f6328d6a18 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -32,16 +32,19 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestUtils; -import org.easymock.EasyMock; import org.junit.Test; -import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class ConsumerNetworkClientTest { @@ -148,69 +151,41 @@ public void testTimeoutUnsentRequest() { @Test public void doNotBlockIfPollConditionIsSatisfied() { - NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class); + NetworkClient mockNetworkClient = mock(NetworkClient.class); ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), mockNetworkClient, metadata, time, 100, 1000, Integer.MAX_VALUE); // expect poll, but with no timeout - EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(0L), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList()); - - EasyMock.replay(mockNetworkClient); - - consumerClient.poll(time.timer(Long.MAX_VALUE), new ConsumerNetworkClient.PollCondition() { - @Override - public boolean shouldBlock() { - return false; - } - }); - - EasyMock.verify(mockNetworkClient); + consumerClient.poll(time.timer(Long.MAX_VALUE), () -> false); + verify(mockNetworkClient).poll(eq(0L), anyLong()); } @Test public void blockWhenPollConditionNotSatisfied() { long timeout = 4000L; - NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class); + NetworkClient mockNetworkClient = mock(NetworkClient.class); ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), mockNetworkClient, metadata, time, 100, 1000, Integer.MAX_VALUE); - EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(1); - EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(timeout), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList()); - - EasyMock.replay(mockNetworkClient); - - consumerClient.poll(time.timer(timeout), new ConsumerNetworkClient.PollCondition() { - @Override - public boolean shouldBlock() { - return true; - } - }); - - EasyMock.verify(mockNetworkClient); + when(mockNetworkClient.inFlightRequestCount()).thenReturn(1); + consumerClient.poll(time.timer(timeout), () -> true); + verify(mockNetworkClient).poll(eq(timeout), anyLong()); } @Test public void blockOnlyForRetryBackoffIfNoInflightRequests() { long retryBackoffMs = 100L; - NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class); + NetworkClient mockNetworkClient = mock(NetworkClient.class); ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), mockNetworkClient, metadata, time, retryBackoffMs, 1000, Integer.MAX_VALUE); - EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(0); - EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(retryBackoffMs), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList()); + when(mockNetworkClient.inFlightRequestCount()).thenReturn(0); - EasyMock.replay(mockNetworkClient); - - consumerClient.poll(time.timer(Long.MAX_VALUE), new ConsumerNetworkClient.PollCondition() { - @Override - public boolean shouldBlock() { - return true; - } - }); + consumerClient.poll(time.timer(Long.MAX_VALUE), () -> true); - EasyMock.verify(mockNetworkClient); + verify(mockNetworkClient).poll(eq(retryBackoffMs), anyLong()); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index dc6fe9f52fc..77fcb517803 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -20,9 +20,11 @@ import java.util.HashSet; import java.util.Set; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.producer.internals.ProducerInterceptors; +import org.apache.kafka.clients.producer.internals.Sender; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; @@ -39,6 +41,7 @@ import org.apache.kafka.common.serialization.ExtendedSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.test.MockMetricsReporter; @@ -46,15 +49,8 @@ import org.apache.kafka.test.MockProducerInterceptor; import org.apache.kafka.test.MockSerializer; import org.apache.kafka.test.TestUtils; -import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.api.support.membermodification.MemberModifier; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareOnlyThisForTest; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.Arrays; import java.util.Collection; @@ -66,14 +62,23 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.notNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -@RunWith(PowerMockRunner.class) -@PowerMockIgnore("javax.management.*") public class KafkaProducerTest { @Test @@ -209,7 +214,7 @@ public void shouldCloseProperlyAndThrowIfInterrupted() throws Exception { final Producer<String, String> producer = new KafkaProducer<>( new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), - new StringSerializer(), new StringSerializer(), metadata, client); + new StringSerializer(), new StringSerializer(), metadata, client, null, time); ExecutorService executor = Executors.newSingleThreadExecutor(); final AtomicReference<Exception> closeException = new AtomicReference<>(); @@ -271,17 +276,13 @@ public void testInvalidSocketReceiveBufferSize() { new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer()); } - @PrepareOnlyThisForTest(Metadata.class) @Test - public void testMetadataFetch() throws Exception { + public void testMetadataFetch() throws InterruptedException { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); - Metadata metadata = PowerMock.createNiceMock(Metadata.class); - MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata); - + ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), + new StringSerializer())); String topic = "topic"; - ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value"); Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000)); final Cluster emptyCluster = new Cluster(null, nodes, Collections.emptySet(), @@ -293,42 +294,48 @@ public void testMetadataFetch() throws Exception { Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)), Collections.emptySet(), Collections.emptySet()); + Metadata metadata = mock(Metadata.class); + + // Return empty cluster 4 times and cluster from then on + when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, emptyCluster, emptyCluster, cluster); - // Expect exactly one fetch for each attempt to refresh while topic metadata is not available - final int refreshAttempts = 5; - EasyMock.expect(metadata.fetch()).andReturn(emptyCluster).times(refreshAttempts - 1); - EasyMock.expect(metadata.fetch()).andReturn(cluster).once(); - EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); - PowerMock.replay(metadata); + KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config, null, null, + metadata, new MockClient(Time.SYSTEM, metadata), null, Time.SYSTEM) { + @Override + Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) { + // give Sender its own Metadata instance so that we can isolate Metadata calls from KafkaProducer + return super.newSender(logContext, kafkaClient, new Metadata(0, 100_000, true)); + } + }; + ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value"); producer.send(record); - PowerMock.verify(metadata); - // Expect exactly one fetch if topic metadata is available - PowerMock.reset(metadata); - EasyMock.expect(metadata.fetch()).andReturn(cluster).once(); - EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); - PowerMock.replay(metadata); + // One request update for each empty cluster returned + verify(metadata, times(4)).requestUpdate(); + verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong()); + verify(metadata, times(5)).fetch(); + + // Should not request update for subsequent `send` producer.send(record, null); - PowerMock.verify(metadata); + verify(metadata, times(4)).requestUpdate(); + verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong()); + verify(metadata, times(6)).fetch(); - // Expect exactly one fetch if topic metadata is available - PowerMock.reset(metadata); - EasyMock.expect(metadata.fetch()).andReturn(cluster).once(); - EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); - PowerMock.replay(metadata); + // Should not request update for subsequent `partitionsFor` producer.partitionsFor(topic); - PowerMock.verify(metadata); + verify(metadata, times(4)).requestUpdate(); + verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong()); + verify(metadata, times(7)).fetch(); + + producer.close(0, TimeUnit.MILLISECONDS); } - @PrepareOnlyThisForTest(Metadata.class) @Test public void testMetadataFetchOnStaleMetadata() throws Exception { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); - Metadata metadata = PowerMock.createNiceMock(Metadata.class); - MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata); - + ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), + new StringSerializer())); String topic = "topic"; ProducerRecord<String, String> initialRecord = new ProducerRecord<>(topic, "value"); // Create a record with a partition higher than the initial (outdated) partition range @@ -353,134 +360,145 @@ public void testMetadataFetchOnStaleMetadata() throws Exception { new PartitionInfo(topic, 2, null, null, null)), Collections.emptySet(), Collections.emptySet()); + Metadata metadata = mock(Metadata.class); + + AtomicInteger invocationCount = new AtomicInteger(0); + + // Return empty cluster 4 times, initialCluster 5 times and extendedCluster after that + when(metadata.fetch()).then(invocation -> { + invocationCount.incrementAndGet(); + if (invocationCount.get() > 9) + return extendedCluster; + else if (invocationCount.get() > 4) + return initialCluster; + return emptyCluster; + }); - // Expect exactly one fetch for each attempt to refresh while topic metadata is not available - final int refreshAttempts = 5; - EasyMock.expect(metadata.fetch()).andReturn(emptyCluster).times(refreshAttempts - 1); - EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once(); - EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); - PowerMock.replay(metadata); + KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config, null, null, + metadata, new MockClient(Time.SYSTEM, metadata), null, Time.SYSTEM) { + @Override + Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) { + // give Sender its own Metadata instance so that we can isolate Metadata calls from KafkaProducer + return super.newSender(logContext, kafkaClient, new Metadata(0, 100_000, true)); + } + }; producer.send(initialRecord); - PowerMock.verify(metadata); - // Expect exactly one fetch if topic metadata is available and records are still within range - PowerMock.reset(metadata); - EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once(); - EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); - PowerMock.replay(metadata); - producer.send(initialRecord, null); - PowerMock.verify(metadata); + // One request update for each empty cluster returned + verify(metadata, times(4)).requestUpdate(); + verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong()); + verify(metadata, times(5)).fetch(); + + // Should not request update if metadata is available and records are within range + producer.send(initialRecord); + verify(metadata, times(4)).requestUpdate(); + verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong()); + verify(metadata, times(6)).fetch(); - // Expect exactly two fetches if topic metadata is available but metadata response still returns + // One request update followed by exception if topic metadata is available but metadata response still returns // the same partition size (either because metadata are still stale at the broker too or because - // there weren't any partitions added in the first place). - PowerMock.reset(metadata); - EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once(); - EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once(); - EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); - PowerMock.replay(metadata); + // there weren't any partitions added in the first place) try { - producer.send(extendedRecord, null); + producer.send(extendedRecord); fail("Expected KafkaException to be raised"); } catch (KafkaException e) { // expected } - PowerMock.verify(metadata); - - // Expect exactly two fetches if topic metadata is available but outdated for the given record - PowerMock.reset(metadata); - EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once(); - EasyMock.expect(metadata.fetch()).andReturn(extendedCluster).once(); - EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); - PowerMock.replay(metadata); - producer.send(extendedRecord, null); - PowerMock.verify(metadata); + verify(metadata, times(5)).requestUpdate(); + verify(metadata, times(5)).awaitUpdate(anyInt(), anyLong()); + verify(metadata, times(8)).fetch(); + + // One request update if metadata is available but outdated for the given record + producer.send(extendedRecord); + verify(metadata, times(6)).requestUpdate(); + verify(metadata, times(6)).awaitUpdate(anyInt(), anyLong()); + verify(metadata, times(10)).fetch(); + + producer.close(0, TimeUnit.MILLISECONDS); } @Test - public void testTopicRefreshInMetadata() throws Exception { + public void testTopicRefreshInMetadata() throws InterruptedException { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "600000"); - KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); + ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), + new StringSerializer())); long refreshBackoffMs = 500L; long metadataExpireMs = 60000L; final Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, true, new ClusterResourceListeners()); final Time time = new MockTime(); - MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata); - MemberModifier.field(KafkaProducer.class, "time").set(producer, time); final String topic = "topic"; - - Thread t = new Thread(() -> { - long startTimeMs = System.currentTimeMillis(); - for (int i = 0; i < 10; i++) { - while (!metadata.updateRequested() && System.currentTimeMillis() - startTimeMs < 1000) - Thread.yield(); - metadata.update(Cluster.empty(), Collections.singleton(topic), time.milliseconds()); - time.sleep(60 * 1000L); + try (KafkaProducer<String, String> producer = new KafkaProducer<>(config, null, null, metadata, + null, null, time)) { + + Thread t = new Thread(() -> { + long startTimeMs = System.currentTimeMillis(); + for (int i = 0; i < 10; i++) { + while (!metadata.updateRequested() && System.currentTimeMillis() - startTimeMs < 1000) + Thread.yield(); + metadata.update(Cluster.empty(), Collections.singleton(topic), time.milliseconds()); + time.sleep(60 * 1000L); + } + }); + t.start(); + try { + producer.partitionsFor(topic); + fail("Expect TimeoutException"); + } catch (TimeoutException e) { + // skip } - }); - t.start(); - try { - producer.partitionsFor(topic); - fail("Expect TimeoutException"); - } catch (TimeoutException e) { - // skip + t.join(); } - Assert.assertTrue("Topic should still exist in metadata", metadata.containsTopic(topic)); + assertTrue("Topic should still exist in metadata", metadata.containsTopic(topic)); } - @SuppressWarnings("unchecked") // safe as generic parameters won't vary - @PrepareOnlyThisForTest(Metadata.class) @Test - public void testHeadersWithExtendedClasses() throws Exception { + @Deprecated + public void testHeadersWithExtendedClasses() { doTestHeaders(ExtendedSerializer.class); } - @SuppressWarnings("unchecked") - @PrepareOnlyThisForTest(Metadata.class) @Test - public void testHeaders() throws Exception { + public void testHeaders() { doTestHeaders(Serializer.class); } - private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerClassToMock) throws Exception { + private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerClassToMock) { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - T keySerializer = PowerMock.createNiceMock(serializerClassToMock); - T valueSerializer = PowerMock.createNiceMock(serializerClassToMock); - - KafkaProducer<String, String> producer = new KafkaProducer<>(props, keySerializer, valueSerializer); - Metadata metadata = PowerMock.createNiceMock(Metadata.class); - MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata); + @SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class + Serializer<String> keySerializer = mock(serializerClassToMock); + @SuppressWarnings("unchecked") + Serializer<String> valueSerializer = mock(serializerClassToMock); + ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, keySerializer, + valueSerializer)); String topic = "topic"; final Cluster cluster = new Cluster( "dummy", Collections.singletonList(new Node(0, "host1", 1000)), - Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)), + Collections.singletonList(new PartitionInfo(topic, 0, null, new Node[0], new Node[0])), Collections.emptySet(), Collections.emptySet()); + Metadata metadata = new Metadata(0, 90000, true); + metadata.update(cluster, Collections.emptySet(), Time.SYSTEM.milliseconds()); + KafkaProducer<String, String> producer = new KafkaProducer<>(config, keySerializer, valueSerializer, + metadata, null, null, Time.SYSTEM); - EasyMock.expect(metadata.fetch()).andReturn(cluster).anyTimes(); - - PowerMock.replay(metadata); + when(keySerializer.serialize(any(), any(), any())).then(invocation -> + invocation.<String>getArgument(2).getBytes()); + when(valueSerializer.serialize(any(), any(), any())).then(invocation -> + invocation.<String>getArgument(2).getBytes()); String value = "value"; - - ProducerRecord<String, String> record = new ProducerRecord<>(topic, value); - EasyMock.expect(keySerializer.serialize(topic, record.headers(), null)).andReturn(null).once(); - EasyMock.expect(valueSerializer.serialize(topic, record.headers(), value)).andReturn(value.getBytes()).once(); - - PowerMock.replay(keySerializer); - PowerMock.replay(valueSerializer); - + String key = "key"; + ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); //ensure headers can be mutated pre send. record.headers().add(new RecordHeader("test", "header2".getBytes())); - producer.send(record, null); //ensure headers are closed and cannot be mutated post send @@ -492,11 +510,12 @@ public void testHeaders() throws Exception { } //ensure existing headers are not changed, and last header for key is still original value - assertTrue(Arrays.equals(record.headers().lastHeader("test").value(), "header2".getBytes())); + assertArrayEquals(record.headers().lastHeader("test").value(), "header2".getBytes()); - PowerMock.verify(valueSerializer); - PowerMock.verify(keySerializer); + verify(valueSerializer).serialize(topic, record.headers(), value); + verify(keySerializer).serialize(topic, record.headers(), key); + producer.close(0, TimeUnit.MILLISECONDS); } @Test @@ -522,40 +541,38 @@ public void testMetricConfigRecordingLevel() { } } - @PrepareOnlyThisForTest(Metadata.class) @Test - public void testInterceptorPartitionSetOnTooLargeRecord() throws Exception { + public void testInterceptorPartitionSetOnTooLargeRecord() { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1"); + ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), + new StringSerializer())); String topic = "topic"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value"); - KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), - new StringSerializer()); - Metadata metadata = PowerMock.createNiceMock(Metadata.class); - MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata); + Metadata metadata = new Metadata(0, 90000, true); final Cluster cluster = new Cluster( "dummy", Collections.singletonList(new Node(0, "host1", 1000)), - Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)), + Collections.singletonList(new PartitionInfo(topic, 0, null, new Node[0], new Node[0])), Collections.emptySet(), Collections.emptySet()); - EasyMock.expect(metadata.fetch()).andReturn(cluster).once(); + metadata.update(cluster, Collections.emptySet(), Time.SYSTEM.milliseconds()); - // Mock interceptors field @SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class - ProducerInterceptors<String, String> interceptors = PowerMock.createMock(ProducerInterceptors.class); - EasyMock.expect(interceptors.onSend(record)).andReturn(record); - interceptors.onSendError(EasyMock.eq(record), EasyMock.notNull(), EasyMock.notNull()); - EasyMock.expectLastCall(); - MemberModifier.field(KafkaProducer.class, "interceptors").set(producer, interceptors); - - PowerMock.replay(metadata); - EasyMock.replay(interceptors); + ProducerInterceptors<String, String> interceptors = mock(ProducerInterceptors.class); + KafkaProducer<String, String> producer = new KafkaProducer<>(config, null, null, + metadata, null, interceptors, Time.SYSTEM); + + when(interceptors.onSend(any())).then(invocation -> invocation.getArgument(0)); + producer.send(record); - EasyMock.verify(interceptors); + verify(interceptors).onSend(record); + verify(interceptors).onSendError(eq(record), notNull(), notNull()); + + producer.close(0, TimeUnit.MILLISECONDS); } @Test @@ -577,7 +594,7 @@ public void testInitTransactionTimeout() { props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); - Time time = new MockTime(); + Time time = Time.SYSTEM; Cluster cluster = TestUtils.singletonCluster("topic", 1); Node node = cluster.nodes().get(0); @@ -589,7 +606,7 @@ public void testInitTransactionTimeout() { try (Producer<String, String> producer = new KafkaProducer<>( new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), - new StringSerializer(), new StringSerializer(), metadata, client)) { + null, null, metadata, client, null, time)) { producer.initTransactions(); fail("initTransactions() should have raised TimeoutException"); } @@ -614,7 +631,7 @@ public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() { Producer<String, String> producer = new KafkaProducer<>( new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), - new StringSerializer(), new StringSerializer(), metadata, client); + null, null, metadata, client, null, time); try { producer.initTransactions(); } catch (TimeoutException e) { @@ -630,7 +647,6 @@ public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() { @Test public void testSendToInvalidTopic() throws Exception { - Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000"); @@ -647,9 +663,9 @@ public void testSendToInvalidTopic() throws Exception { Producer<String, String> producer = new KafkaProducer<>(new ProducerConfig( ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), - new StringSerializer(), new StringSerializer(), metadata, client); + null, null, metadata, client, null, time); - String invalidTopicName = "topic abc"; // Invalid topic name due to space + String invalidTopicName = "topic abc"; // Invalid topic name due to space ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka"); Set<String> invalidTopic = new HashSet<>(); @@ -665,8 +681,11 @@ public void testSendToInvalidTopic() throws Exception { Future<RecordMetadata> future = producer.send(record); - assertEquals("Cluster has incorrect invalid topic list.", metaDataUpdateResponseCluster.invalidTopics(), metadata.fetch().invalidTopics()); + assertEquals("Cluster has incorrect invalid topic list", metaDataUpdateResponseCluster.invalidTopics(), + metadata.fetch().invalidTopics()); TestUtils.assertFutureError(future, InvalidTopicException.class); + + producer.close(0, TimeUnit.MILLISECONDS); } @Test @@ -689,7 +708,7 @@ public void testCloseWhenWaitingForMetadataUpdate() throws InterruptedException Producer<String, String> producer = new KafkaProducer<>( new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), - new StringSerializer(), new StringSerializer(), metadata, client); + new StringSerializer(), new StringSerializer(), metadata, client, null, time); ExecutorService executor = Executors.newSingleThreadExecutor(); final AtomicReference<Exception> sendException = new AtomicReference<>(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index 23fc5411b3c..ce74eb1f0f1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -16,11 +16,8 @@ */ package org.apache.kafka.clients.producer.internals; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Meter; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.test.TestUtils; @@ -36,25 +33,16 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.anyLong; -import static org.easymock.EasyMock.anyDouble; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.anyString; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; - -@RunWith(PowerMockRunner.class) public class BufferPoolTest { private final MockTime time = new MockTime(); private final Metrics metrics = new Metrics(time); @@ -241,36 +229,25 @@ public void testCleanupMemoryAvailabilityWaiterOnInterruption() throws Exception // both the allocate() called by threads t1 and t2 should have been interrupted and the waiters queue should be empty assertEquals(pool.queued(), 0); } - - @PrepareForTest({Sensor.class, MetricName.class}) + @Test public void testCleanupMemoryAvailabilityOnMetricsException() throws Exception { - Metrics mockedMetrics = createNiceMock(Metrics.class); - Sensor mockedSensor = createNiceMock(Sensor.class); - MetricName metricName = createNiceMock(MetricName.class); - MetricName rateMetricName = createNiceMock(MetricName.class); - MetricName totalMetricName = createNiceMock(MetricName.class); - - expect(mockedMetrics.sensor(BufferPool.WAIT_TIME_SENSOR_NAME)).andReturn(mockedSensor); + BufferPool bufferPool = spy(new BufferPool(2, 1, new Metrics(), time, metricGroup)); + doThrow(new OutOfMemoryError()).when(bufferPool).recordWaitTime(anyLong()); - mockedSensor.record(anyDouble(), anyLong()); - expectLastCall().andThrow(new OutOfMemoryError()); - expect(mockedMetrics.metricName(anyString(), eq(metricGroup), anyString())).andReturn(metricName); - expect(mockedSensor.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName))).andReturn(true); - - replay(mockedMetrics, mockedSensor, metricName); - - BufferPool bufferPool = new BufferPool(2, 1, mockedMetrics, time, metricGroup); bufferPool.allocate(1, 0); try { bufferPool.allocate(2, 1000); - assertTrue("Expected oom.", false); + fail("Expected oom."); } catch (OutOfMemoryError expected) { } assertEquals(1, bufferPool.availableMemory()); assertEquals(0, bufferPool.queued()); + assertEquals(1, bufferPool.unallocatedMemory()); //This shouldn't timeout bufferPool.allocate(1, 0); + + verify(bufferPool).recordWaitTime(anyLong()); } private static class BufferPoolAllocator implements Runnable { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index f93f3432bdd..f2a34f6c89c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -84,24 +84,23 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.InOrder; -import static org.easymock.EasyMock.anyBoolean; -import static org.easymock.EasyMock.anyInt; -import static org.easymock.EasyMock.anyLong; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.anyString; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.geq; -import static org.easymock.EasyMock.mock; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; +import static org.mockito.AdditionalMatchers.geq; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.spy; public class SenderTest { @@ -140,7 +139,8 @@ public void tearDown() { @Test public void testSimple() throws Exception { long offset = 0; - Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), + null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); @@ -2037,33 +2037,27 @@ public void testExpiredBatchDoesNotRetry() throws Exception { @Test public void testResetNextBatchExpiry() throws Exception { - MockClient delegateClient = new MockClient(time); - client = mock(MockClient.class); - expect(client.ready(anyObject(), anyLong())).andDelegateTo(delegateClient).anyTimes(); - expect( - client.newClientRequest( - anyString(), anyObject(), anyLong(), anyBoolean(), anyInt(), anyObject())) - .andDelegateTo(delegateClient).anyTimes(); - client.send(anyObject(), anyLong()); - expectLastCall().andDelegateTo(delegateClient).anyTimes(); - expect(client.poll(eq(0L), anyLong())).andDelegateTo(delegateClient).times(1); - expect(client.poll(eq(accumulator.getDeliveryTimeoutMs()), anyLong())) - .andDelegateTo(delegateClient) - .times(1); - expect(client.poll(geq(1L), anyLong())).andDelegateTo(delegateClient).times(1); - replay(client); + client = spy(new MockClient(time)); setupWithTransactionState(null); - accumulator.append( - tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT); + accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, + MAX_BLOCK_TIMEOUT); sender.run(time.milliseconds()); sender.run(time.milliseconds()); time.setCurrentTimeMs(time.milliseconds() + accumulator.getDeliveryTimeoutMs() + 1); sender.run(time.milliseconds()); - verify(client); + InOrder inOrder = inOrder(client); + inOrder.verify(client, atLeastOnce()).ready(any(), anyLong()); + inOrder.verify(client, atLeastOnce()).newClientRequest(anyString(), any(), anyLong(), anyBoolean(), anyInt(), + any()); + inOrder.verify(client, atLeastOnce()).send(any(), anyLong()); + inOrder.verify(client).poll(eq(0L), anyLong()); + inOrder.verify(client).poll(eq(accumulator.getDeliveryTimeoutMs()), anyLong()); + inOrder.verify(client).poll(geq(1L), anyLong()); + } private class MatchingBufferPool extends BufferPool { diff --git a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java index c2b89fe6882..27daf0face8 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext; import org.apache.kafka.common.security.auth.PrincipalBuilder; import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.easymock.EasyMock; import org.junit.Test; import java.net.InetAddress; @@ -35,13 +34,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; public class ChannelBuildersTest { @Test public void testCreateOldPrincipalBuilder() throws Exception { - TransportLayer transportLayer = EasyMock.mock(TransportLayer.class); - Authenticator authenticator = EasyMock.mock(Authenticator.class); + TransportLayer transportLayer = mock(TransportLayer.class); + Authenticator authenticator = mock(Authenticator.class); Map<String, Object> configs = new HashMap<>(); configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, OldPrincipalBuilder.class); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index cef7c7fae49..6cf75861122 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; -import org.easymock.IMocksControl; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -53,15 +52,17 @@ import java.util.Set; import java.util.Optional; -import static org.easymock.EasyMock.createControl; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** @@ -561,31 +562,20 @@ private Thread createSender(InetSocketAddress serverAddress, byte[] payload) { */ @Test public void testConnectDisconnectDuringInSinglePoll() throws Exception { - IMocksControl control = createControl(); - // channel is connected, not ready and it throws an exception during prepare - KafkaChannel kafkaChannel = control.createMock(KafkaChannel.class); - expect(kafkaChannel.id()).andStubReturn("1"); - expect(kafkaChannel.socketDescription()).andStubReturn(""); - expect(kafkaChannel.state()).andStubReturn(ChannelState.NOT_CONNECTED); - expect(kafkaChannel.finishConnect()).andReturn(true); - expect(kafkaChannel.isConnected()).andStubReturn(true); - // record void method invocations - kafkaChannel.disconnect(); - kafkaChannel.close(); - expect(kafkaChannel.ready()).andReturn(false).anyTimes(); - // prepare throws an exception - kafkaChannel.prepare(); - expectLastCall().andThrow(new IOException()); - - SelectionKey selectionKey = control.createMock(SelectionKey.class); - expect(kafkaChannel.selectionKey()).andStubReturn(selectionKey); - expect(selectionKey.channel()).andReturn(SocketChannel.open()); - expect(selectionKey.readyOps()).andStubReturn(SelectionKey.OP_CONNECT); - selectionKey.cancel(); - expectLastCall(); - - control.replay(); + KafkaChannel kafkaChannel = mock(KafkaChannel.class); + when(kafkaChannel.id()).thenReturn("1"); + when(kafkaChannel.socketDescription()).thenReturn(""); + when(kafkaChannel.state()).thenReturn(ChannelState.NOT_CONNECTED); + when(kafkaChannel.finishConnect()).thenReturn(true); + when(kafkaChannel.isConnected()).thenReturn(true); + when(kafkaChannel.ready()).thenReturn(false); + doThrow(new IOException()).when(kafkaChannel).prepare(); + + SelectionKey selectionKey = mock(SelectionKey.class); + when(kafkaChannel.selectionKey()).thenReturn(selectionKey); + when(selectionKey.channel()).thenReturn(SocketChannel.open()); + when(selectionKey.readyOps()).thenReturn(SelectionKey.OP_CONNECT); selectionKey.attach(kafkaChannel); Set<SelectionKey> selectionKeys = Utils.mkSet(selectionKey); @@ -595,7 +585,10 @@ public void testConnectDisconnectDuringInSinglePoll() throws Exception { assertTrue(selector.disconnected().containsKey(kafkaChannel.id())); assertNull(selectionKey.attachment()); - control.verify(); + verify(kafkaChannel, atLeastOnce()).ready(); + verify(kafkaChannel).disconnect(); + verify(kafkaChannel).close(); + verify(selectionKey).cancel(); } @Test diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 77de33fcea7..4b2b3618a4a 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -23,8 +23,6 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.test.TestUtils; -import org.easymock.EasyMock; -import org.easymock.EasyMockSupport; import org.junit.Before; import org.junit.Test; @@ -45,8 +43,14 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -public class FileRecordsTest extends EasyMockSupport { +public class FileRecordsTest { private byte[][] values = new byte[][] { "abcd".getBytes(), @@ -66,10 +70,7 @@ public void setup() throws IOException { public void testAppendProtectsFromOverflow() throws Exception { File fileMock = mock(File.class); FileChannel fileChannelMock = mock(FileChannel.class); - EasyMock.expect(fileChannelMock.size()).andStubReturn((long) Integer.MAX_VALUE); - EasyMock.expect(fileChannelMock.position(Integer.MAX_VALUE)).andReturn(fileChannelMock); - - replayAll(); + when(fileChannelMock.size()).thenReturn((long) Integer.MAX_VALUE); FileRecords records = new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false); append(records, values); @@ -79,9 +80,7 @@ public void testAppendProtectsFromOverflow() throws Exception { public void testOpenOversizeFile() throws Exception { File fileMock = mock(File.class); FileChannel fileChannelMock = mock(FileChannel.class); - EasyMock.expect(fileChannelMock.size()).andStubReturn(Integer.MAX_VALUE + 5L); - - replayAll(); + when(fileChannelMock.size()).thenReturn(Integer.MAX_VALUE + 5L); new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false); } @@ -262,16 +261,16 @@ public void testTruncate() throws IOException { */ @Test public void testTruncateNotCalledIfSizeIsSameAsTargetSize() throws IOException { - FileChannel channelMock = EasyMock.createMock(FileChannel.class); + FileChannel channelMock = mock(FileChannel.class); - EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce(); - EasyMock.expect(channelMock.position(42L)).andReturn(null); - EasyMock.replay(channelMock); + when(channelMock.size()).thenReturn(42L); + when(channelMock.position(42L)).thenReturn(null); FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); fileRecords.truncateTo(42); - EasyMock.verify(channelMock); + verify(channelMock, atLeastOnce()).size(); + verify(channelMock, times(0)).truncate(anyLong()); } /** @@ -280,11 +279,9 @@ public void testTruncateNotCalledIfSizeIsSameAsTargetSize() throws IOException { */ @Test public void testTruncateNotCalledIfSizeIsBiggerThanTargetSize() throws IOException { - FileChannel channelMock = EasyMock.createMock(FileChannel.class); + FileChannel channelMock = mock(FileChannel.class); - EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce(); - EasyMock.expect(channelMock.position(42L)).andReturn(null); - EasyMock.replay(channelMock); + when(channelMock.size()).thenReturn(42L); FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); @@ -295,7 +292,7 @@ public void testTruncateNotCalledIfSizeIsBiggerThanTargetSize() throws IOExcepti // expected } - EasyMock.verify(channelMock); + verify(channelMock, atLeastOnce()).size(); } /** @@ -303,17 +300,16 @@ public void testTruncateNotCalledIfSizeIsBiggerThanTargetSize() throws IOExcepti */ @Test public void testTruncateIfSizeIsDifferentToTargetSize() throws IOException { - FileChannel channelMock = EasyMock.createMock(FileChannel.class); + FileChannel channelMock = mock(FileChannel.class); - EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce(); - EasyMock.expect(channelMock.position(42L)).andReturn(null).once(); - EasyMock.expect(channelMock.truncate(23L)).andReturn(null).once(); - EasyMock.replay(channelMock); + when(channelMock.size()).thenReturn(42L); + when(channelMock.truncate(anyLong())).thenReturn(channelMock); FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); fileRecords.truncateTo(23); - EasyMock.verify(channelMock); + verify(channelMock, atLeastOnce()).size(); + verify(channelMock).truncate(23); } /** diff --git a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java index 25bcd5008b2..a05a8502bf1 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java @@ -20,11 +20,8 @@ import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; -import org.apache.kafka.common.security.kerberos.KerberosName; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.scram.internals.ScramMechanism; -import org.easymock.EasyMock; -import org.easymock.EasyMockSupport; import org.junit.Test; import javax.net.ssl.SSLSession; @@ -33,8 +30,13 @@ import java.security.Principal; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport { +public class DefaultKafkaPrincipalBuilderTest { @Test @SuppressWarnings("deprecation") @@ -43,12 +45,7 @@ public void testUseOldPrincipalBuilderForPlaintextIfProvided() throws Exception Authenticator authenticator = mock(Authenticator.class); PrincipalBuilder oldPrincipalBuilder = mock(PrincipalBuilder.class); - EasyMock.expect(oldPrincipalBuilder.buildPrincipal(transportLayer, authenticator)) - .andReturn(new DummyPrincipal("foo")); - oldPrincipalBuilder.close(); - EasyMock.expectLastCall(); - - replayAll(); + when(oldPrincipalBuilder.buildPrincipal(any(), any())).thenReturn(new DummyPrincipal("foo")); DefaultKafkaPrincipalBuilder builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer, oldPrincipalBuilder, null); @@ -59,15 +56,17 @@ public void testUseOldPrincipalBuilderForPlaintextIfProvided() throws Exception assertEquals("foo", principal.getName()); builder.close(); - verifyAll(); + + verify(oldPrincipalBuilder).buildPrincipal(transportLayer, authenticator); + verify(oldPrincipalBuilder).close(); } @Test public void testReturnAnonymousPrincipalForPlaintext() throws Exception { - DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null); - assertEquals(KafkaPrincipal.ANONYMOUS, builder.build( - new PlaintextAuthenticationContext(InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()))); - builder.close(); + try (DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null)) { + assertEquals(KafkaPrincipal.ANONYMOUS, builder.build( + new PlaintextAuthenticationContext(InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()))); + } } @Test @@ -78,12 +77,8 @@ public void testUseOldPrincipalBuilderForSslIfProvided() throws Exception { PrincipalBuilder oldPrincipalBuilder = mock(PrincipalBuilder.class); SSLSession session = mock(SSLSession.class); - EasyMock.expect(oldPrincipalBuilder.buildPrincipal(transportLayer, authenticator)) - .andReturn(new DummyPrincipal("foo")); - oldPrincipalBuilder.close(); - EasyMock.expectLastCall(); - - replayAll(); + when(oldPrincipalBuilder.buildPrincipal(any(), any())) + .thenReturn(new DummyPrincipal("foo")); DefaultKafkaPrincipalBuilder builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer, oldPrincipalBuilder, null); @@ -94,16 +89,16 @@ public void testUseOldPrincipalBuilderForSslIfProvided() throws Exception { assertEquals("foo", principal.getName()); builder.close(); - verifyAll(); + + verify(oldPrincipalBuilder).buildPrincipal(transportLayer, authenticator); + verify(oldPrincipalBuilder).close(); } @Test public void testUseSessionPeerPrincipalForSsl() throws Exception { SSLSession session = mock(SSLSession.class); - EasyMock.expect(session.getPeerPrincipal()).andReturn(new DummyPrincipal("foo")); - - replayAll(); + when(session.getPeerPrincipal()).thenReturn(new DummyPrincipal("foo")); DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null); @@ -113,17 +108,16 @@ public void testUseSessionPeerPrincipalForSsl() throws Exception { assertEquals("foo", principal.getName()); builder.close(); - verifyAll(); + + verify(session, atLeastOnce()).getPeerPrincipal(); } @Test public void testPrincipalBuilderScram() throws Exception { SaslServer server = mock(SaslServer.class); - EasyMock.expect(server.getMechanismName()).andReturn(ScramMechanism.SCRAM_SHA_256.mechanismName()); - EasyMock.expect(server.getAuthorizationID()).andReturn("foo"); - - replayAll(); + when(server.getMechanismName()).thenReturn(ScramMechanism.SCRAM_SHA_256.mechanismName()); + when(server.getAuthorizationID()).thenReturn("foo"); DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null); @@ -133,7 +127,9 @@ public void testPrincipalBuilderScram() throws Exception { assertEquals("foo", principal.getName()); builder.close(); - verifyAll(); + + verify(server, atLeastOnce()).getMechanismName(); + verify(server, atLeastOnce()).getAuthorizationID(); } @Test @@ -141,12 +137,9 @@ public void testPrincipalBuilderGssapi() throws Exception { SaslServer server = mock(SaslServer.class); KerberosShortNamer kerberosShortNamer = mock(KerberosShortNamer.class); - EasyMock.expect(server.getMechanismName()).andReturn(SaslConfigs.GSSAPI_MECHANISM); - EasyMock.expect(server.getAuthorizationID()).andReturn("foo/h...@realm.com"); - EasyMock.expect(kerberosShortNamer.shortName(EasyMock.anyObject(KerberosName.class))) - .andReturn("foo"); - - replayAll(); + when(server.getMechanismName()).thenReturn(SaslConfigs.GSSAPI_MECHANISM); + when(server.getAuthorizationID()).thenReturn("foo/h...@realm.com"); + when(kerberosShortNamer.shortName(any())).thenReturn("foo"); DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer); @@ -156,7 +149,10 @@ public void testPrincipalBuilderGssapi() throws Exception { assertEquals("foo", principal.getName()); builder.close(); - verifyAll(); + + verify(server, atLeastOnce()).getMechanismName(); + verify(server, atLeastOnce()).getAuthorizationID(); + verify(kerberosShortNamer, atLeastOnce()).shortName(any()); } private static class DummyPrincipal implements Principal { diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java index 1ae83eeace1..d62261ac363 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java @@ -28,9 +28,6 @@ import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.plain.PlainLoginModule; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IAnswer; import org.junit.Test; import javax.security.auth.Subject; @@ -42,33 +39,32 @@ import static org.apache.kafka.common.security.scram.internals.ScramMechanism.SCRAM_SHA_256; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class SaslServerAuthenticatorTest { @Test(expected = InvalidReceiveException.class) public void testOversizeRequest() throws IOException { - TransportLayer transportLayer = EasyMock.mock(TransportLayer.class); + TransportLayer transportLayer = mock(TransportLayer.class); Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Collections.singletonList(SCRAM_SHA_256.mechanismName())); SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName()); - final Capture<ByteBuffer> size = EasyMock.newCapture(); - EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new IAnswer<Integer>() { - @Override - public Integer answer() { - size.getValue().putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE + 1); - return 4; - } + when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> { + invocation.<ByteBuffer>getArgument(0).putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE + 1); + return 4; }); - - EasyMock.replay(transportLayer); - authenticator.authenticate(); + verify(transportLayer).read(any(ByteBuffer.class)); } @Test public void testUnexpectedRequestType() throws IOException { - TransportLayer transportLayer = EasyMock.mock(TransportLayer.class); + TransportLayer transportLayer = mock(TransportLayer.class); Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Collections.singletonList(SCRAM_SHA_256.mechanismName())); SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName()); @@ -76,33 +72,23 @@ public void testUnexpectedRequestType() throws IOException { final RequestHeader header = new RequestHeader(ApiKeys.METADATA, (short) 0, "clientId", 13243); final Struct headerStruct = header.toStruct(); - final Capture<ByteBuffer> size = EasyMock.newCapture(); - EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new IAnswer<Integer>() { - @Override - public Integer answer() { - size.getValue().putInt(headerStruct.sizeOf()); - return 4; - } - }); - - final Capture<ByteBuffer> payload = EasyMock.newCapture(); - EasyMock.expect(transportLayer.read(EasyMock.capture(payload))).andAnswer(new IAnswer<Integer>() { - @Override - public Integer answer() { - // serialize only the request header. the authenticator should not parse beyond this - headerStruct.writeTo(payload.getValue()); - return headerStruct.sizeOf(); - } + when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> { + invocation.<ByteBuffer>getArgument(0).putInt(headerStruct.sizeOf()); + return 4; + }).then(invocation -> { + // serialize only the request header. the authenticator should not parse beyond this + headerStruct.writeTo(invocation.getArgument(0)); + return headerStruct.sizeOf(); }); - EasyMock.replay(transportLayer); - try { authenticator.authenticate(); fail("Expected authenticate() to raise an exception"); } catch (IllegalSaslStateException e) { // expected exception } + + verify(transportLayer, times(2)).read(any(ByteBuffer.class)); } private SaslServerAuthenticator setupAuthenticator(Map<String, ?> configs, TransportLayer transportLayer, String mechanism) throws IOException { diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java index 51d60123c7f..2ecde99b704 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyZeroInteractions; import java.io.IOException; import java.util.Collections; @@ -40,7 +42,6 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.security.auth.SaslExtensionsCallback; import org.apache.kafka.common.security.auth.SaslExtensions; -import org.easymock.EasyMock; import org.junit.Test; public class OAuthBearerLoginModuleTest { @@ -124,12 +125,10 @@ public void login1Commit1Login2Commit2Logout1Login3Commit3Logout2() throws Login Set<Object> publicCredentials = subject.getPublicCredentials(); // Create callback handler - OAuthBearerToken[] tokens = new OAuthBearerToken[] {EasyMock.mock(OAuthBearerToken.class), - EasyMock.mock(OAuthBearerToken.class), EasyMock.mock(OAuthBearerToken.class)}; - SaslExtensions[] extensions = new SaslExtensions[] {EasyMock.mock(SaslExtensions.class), - EasyMock.mock(SaslExtensions.class), EasyMock.mock(SaslExtensions.class)}; - EasyMock.replay(tokens[0], tokens[1], tokens[2]); // expect nothing - EasyMock.replay(extensions[0], extensions[2]); + OAuthBearerToken[] tokens = new OAuthBearerToken[] {mock(OAuthBearerToken.class), + mock(OAuthBearerToken.class), mock(OAuthBearerToken.class)}; + SaslExtensions[] extensions = new SaslExtensions[] {mock(SaslExtensions.class), + mock(SaslExtensions.class), mock(SaslExtensions.class)}; TestCallbackHandler testTokenCallbackHandler = new TestCallbackHandler(tokens, extensions); // Create login modules @@ -207,6 +206,9 @@ public void login1Commit1Login2Commit2Logout1Login3Commit3Logout2() throws Login assertEquals(1, publicCredentials.size()); assertSame(tokens[2], privateCredentials.iterator().next()); assertSame(extensions[2], publicCredentials.iterator().next()); + + verifyZeroInteractions((Object[]) tokens); + verifyZeroInteractions((Object[]) extensions); } @Test @@ -220,12 +222,10 @@ public void login1Commit1Logout1Login2Commit2Logout2() throws LoginException { Set<Object> publicCredentials = subject.getPublicCredentials(); // Create callback handler - OAuthBearerToken[] tokens = new OAuthBearerToken[] {EasyMock.mock(OAuthBearerToken.class), - EasyMock.mock(OAuthBearerToken.class)}; - SaslExtensions[] extensions = new SaslExtensions[] {EasyMock.mock(SaslExtensions.class), - EasyMock.mock(SaslExtensions.class)}; - EasyMock.replay(tokens[0], tokens[1]); // expect nothing - EasyMock.replay(extensions[0], extensions[1]); + OAuthBearerToken[] tokens = new OAuthBearerToken[] {mock(OAuthBearerToken.class), + mock(OAuthBearerToken.class)}; + SaslExtensions[] extensions = new SaslExtensions[] {mock(SaslExtensions.class), + mock(SaslExtensions.class)}; TestCallbackHandler testTokenCallbackHandler = new TestCallbackHandler(tokens, extensions); // Create login modules @@ -268,6 +268,9 @@ public void login1Commit1Logout1Login2Commit2Logout2() throws LoginException { // Should have nothing again assertEquals(0, privateCredentials.size()); assertEquals(0, publicCredentials.size()); + + verifyZeroInteractions((Object[]) tokens); + verifyZeroInteractions((Object[]) extensions); } @Test @@ -280,12 +283,10 @@ public void loginAbortLoginCommitLogout() throws LoginException { Set<Object> publicCredentials = subject.getPublicCredentials(); // Create callback handler - OAuthBearerToken[] tokens = new OAuthBearerToken[] {EasyMock.mock(OAuthBearerToken.class), - EasyMock.mock(OAuthBearerToken.class)}; - SaslExtensions[] extensions = new SaslExtensions[] {EasyMock.mock(SaslExtensions.class), - EasyMock.mock(SaslExtensions.class)}; - EasyMock.replay(tokens[0], tokens[1]); // expect nothing - EasyMock.replay(extensions[0], extensions[1]); + OAuthBearerToken[] tokens = new OAuthBearerToken[] {mock(OAuthBearerToken.class), + mock(OAuthBearerToken.class)}; + SaslExtensions[] extensions = new SaslExtensions[] {mock(SaslExtensions.class), + mock(SaslExtensions.class)}; TestCallbackHandler testTokenCallbackHandler = new TestCallbackHandler(tokens, extensions); // Create login module @@ -319,6 +320,9 @@ public void loginAbortLoginCommitLogout() throws LoginException { // Should have nothing again assertEquals(0, privateCredentials.size()); assertEquals(0, publicCredentials.size()); + + verifyZeroInteractions((Object[]) tokens); + verifyZeroInteractions((Object[]) extensions); } @Test @@ -332,12 +336,10 @@ public void login1Commit1Login2Abort2Login3Commit3Logout3() throws LoginExceptio Set<Object> publicCredentials = subject.getPublicCredentials(); // Create callback handler - OAuthBearerToken[] tokens = new OAuthBearerToken[] {EasyMock.mock(OAuthBearerToken.class), - EasyMock.mock(OAuthBearerToken.class), EasyMock.mock(OAuthBearerToken.class)}; - SaslExtensions[] extensions = new SaslExtensions[] {EasyMock.mock(SaslExtensions.class), - EasyMock.mock(SaslExtensions.class), EasyMock.mock(SaslExtensions.class)}; - EasyMock.replay(tokens[0], tokens[1], tokens[2]); // expect nothing - EasyMock.replay(extensions[0], extensions[1], extensions[2]); + OAuthBearerToken[] tokens = new OAuthBearerToken[] {mock(OAuthBearerToken.class), + mock(OAuthBearerToken.class), mock(OAuthBearerToken.class)}; + SaslExtensions[] extensions = new SaslExtensions[] {mock(SaslExtensions.class), + mock(SaslExtensions.class), mock(SaslExtensions.class)}; TestCallbackHandler testTokenCallbackHandler = new TestCallbackHandler(tokens, extensions); // Create login modules @@ -402,6 +404,9 @@ public void login1Commit1Login2Abort2Login3Commit3Logout3() throws LoginExceptio assertSame(tokens[2], privateCredentials.iterator().next()); assertEquals(1, publicCredentials.size()); assertSame(extensions[2], publicCredentials.iterator().next()); + + verifyZeroInteractions((Object[]) tokens); + verifyZeroInteractions((Object[]) extensions); } /** @@ -413,9 +418,8 @@ public void commitDoesNotThrowOnUnsupportedExtensionsCallback() throws LoginExce Subject subject = new Subject(); // Create callback handler - OAuthBearerToken[] tokens = new OAuthBearerToken[] {EasyMock.mock(OAuthBearerToken.class), - EasyMock.mock(OAuthBearerToken.class), EasyMock.mock(OAuthBearerToken.class)}; - EasyMock.replay(tokens[0], tokens[1], tokens[2]); // expect nothing + OAuthBearerToken[] tokens = new OAuthBearerToken[] {mock(OAuthBearerToken.class), + mock(OAuthBearerToken.class), mock(OAuthBearerToken.class)}; TestCallbackHandler testTokenCallbackHandler = new TestCallbackHandler(tokens, new SaslExtensions[] {RAISE_UNSUPPORTED_CB_EXCEPTION_FLAG}); // Create login modules @@ -429,5 +433,7 @@ public void commitDoesNotThrowOnUnsupportedExtensionsCallback() throws LoginExce SaslExtensions extensions = subject.getPublicCredentials(SaslExtensions.class).iterator().next(); assertNotNull(extensions); assertTrue(extensions.map().isEmpty()); + + verifyZeroInteractions((Object[]) tokens); } } diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java index fad743136f3..6d23f620599 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.security.auth.SaslExtensions; import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; -import org.easymock.EasyMockSupport; import org.junit.Test; import javax.security.auth.callback.Callback; @@ -39,7 +38,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -public class OAuthBearerSaslClientTest extends EasyMockSupport { +public class OAuthBearerSaslClientTest { private static final Map<String, String> TEST_PROPERTIES = new LinkedHashMap<String, String>() { { diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java index 9c62bef427a..cc0b9835073 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java @@ -19,6 +19,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Date; @@ -42,8 +45,8 @@ import org.apache.kafka.common.utils.MockScheduler; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.easymock.EasyMock; import org.junit.Test; +import org.mockito.InOrder; public class ExpiringCredentialRefreshingLoginTest { private static final Configuration EMPTY_WILDCARD_CONFIGURATION; @@ -257,24 +260,8 @@ public void testRefresh() throws Exception { for (int numExpectedRefreshes : new int[] {0, 1, 2}) { for (boolean clientReloginAllowedBeforeLogout : new boolean[] {true, false}) { Subject subject = new Subject(); - /* - * Create a mock and record the fact that we expect login() to be invoked - * followed by getSubject() and then ultimately followed by numExpectedRefreshes - * pairs of either login()/logout() or logout()/login() calls - */ - final LoginContext mockLoginContext = EasyMock.strictMock(LoginContext.class); - mockLoginContext.login(); - EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject); - for (int i = 0; i < numExpectedRefreshes; ++i) { - if (clientReloginAllowedBeforeLogout) { - mockLoginContext.login(); - mockLoginContext.logout(); - } else { - mockLoginContext.logout(); - mockLoginContext.login(); - } - } - EasyMock.replay(mockLoginContext); + final LoginContext mockLoginContext = mock(LoginContext.class); + when(mockLoginContext.getSubject()).thenReturn(subject); MockTime mockTime = new MockTime(); long startMs = mockTime.milliseconds(); @@ -335,6 +322,23 @@ public void testRefresh() throws Exception { assertEquals((i + 1) * 1000 * 60 * refreshEveryMinutes, waiter.get().longValue() - startMs); } assertFalse(waiters.get(numExpectedRefreshes).isDone()); + + /* + * We expect login() to be invoked followed by getSubject() and then ultimately followed by + * numExpectedRefreshes pairs of either login()/logout() or logout()/login() calls + */ + InOrder inOrder = inOrder(mockLoginContext); + inOrder.verify(mockLoginContext).login(); + inOrder.verify(mockLoginContext).getSubject(); + for (int i = 0; i < numExpectedRefreshes; ++i) { + if (clientReloginAllowedBeforeLogout) { + inOrder.verify(mockLoginContext).login(); + inOrder.verify(mockLoginContext).logout(); + } else { + inOrder.verify(mockLoginContext).logout(); + inOrder.verify(mockLoginContext).login(); + } + } } } } @@ -343,20 +347,9 @@ public void testRefresh() throws Exception { public void testRefreshWithExpirationSmallerThanConfiguredBuffers() throws Exception { int numExpectedRefreshes = 1; boolean clientReloginAllowedBeforeLogout = true; + final LoginContext mockLoginContext = mock(LoginContext.class); Subject subject = new Subject(); - /* - * Create a mock and record the fact that we expect login() to be invoked - * followed by getSubject() and then ultimately followed by numExpectedRefreshes - * pairs of login()/logout() calls - */ - final LoginContext mockLoginContext = EasyMock.strictMock(LoginContext.class); - mockLoginContext.login(); - EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject); - for (int i = 0; i < numExpectedRefreshes; ++i) { - mockLoginContext.login(); - mockLoginContext.logout(); - } - EasyMock.replay(mockLoginContext); + when(mockLoginContext.getSubject()).thenReturn(subject); MockTime mockTime = new MockTime(); long startMs = mockTime.milliseconds(); @@ -419,6 +412,13 @@ public void testRefreshWithExpirationSmallerThanConfiguredBuffers() throws Excep assertEquals((i + 1) * 1000 * 60 * refreshEveryMinutes, waiter.get().longValue() - startMs); } assertFalse(waiters.get(numExpectedRefreshes).isDone()); + + InOrder inOrder = inOrder(mockLoginContext); + inOrder.verify(mockLoginContext).login(); + for (int i = 0; i < numExpectedRefreshes; ++i) { + inOrder.verify(mockLoginContext).login(); + inOrder.verify(mockLoginContext).logout(); + } } @Test @@ -426,19 +426,8 @@ public void testRefreshWithMinPeriodIntrusion() throws Exception { int numExpectedRefreshes = 1; boolean clientReloginAllowedBeforeLogout = true; Subject subject = new Subject(); - /* - * Create a mock and record the fact that we expect login() to be invoked - * followed by getSubject() and then ultimately followed by numExpectedRefreshes - * pairs of login()/logout() calls - */ - final LoginContext mockLoginContext = EasyMock.strictMock(LoginContext.class); - mockLoginContext.login(); - EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject); - for (int i = 0; i < numExpectedRefreshes; ++i) { - mockLoginContext.login(); - mockLoginContext.logout(); - } - EasyMock.replay(mockLoginContext); + final LoginContext mockLoginContext = mock(LoginContext.class); + when(mockLoginContext.getSubject()).thenReturn(subject); MockTime mockTime = new MockTime(); long startMs = mockTime.milliseconds(); @@ -504,6 +493,13 @@ public void testRefreshWithMinPeriodIntrusion() throws Exception { waiter.get().longValue() - startMs); } assertFalse(waiters.get(numExpectedRefreshes).isDone()); + + InOrder inOrder = inOrder(mockLoginContext); + inOrder.verify(mockLoginContext).login(); + for (int i = 0; i < numExpectedRefreshes; ++i) { + inOrder.verify(mockLoginContext).login(); + inOrder.verify(mockLoginContext).logout(); + } } @Test @@ -511,19 +507,8 @@ public void testRefreshWithPreExpirationBufferIntrusion() throws Exception { int numExpectedRefreshes = 1; boolean clientReloginAllowedBeforeLogout = true; Subject subject = new Subject(); - /* - * Create a mock and record the fact that we expect login() to be invoked - * followed by getSubject() and then ultimately followed by numExpectedRefreshes - * pairs of login()/logout() calls - */ - final LoginContext mockLoginContext = EasyMock.strictMock(LoginContext.class); - mockLoginContext.login(); - EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject); - for (int i = 0; i < numExpectedRefreshes; ++i) { - mockLoginContext.login(); - mockLoginContext.logout(); - } - EasyMock.replay(mockLoginContext); + final LoginContext mockLoginContext = mock(LoginContext.class); + when(mockLoginContext.getSubject()).thenReturn(subject); MockTime mockTime = new MockTime(); long startMs = mockTime.milliseconds(); @@ -588,6 +573,13 @@ public void testRefreshWithPreExpirationBufferIntrusion() throws Exception { waiter.get().longValue() - startMs); } assertFalse(waiters.get(numExpectedRefreshes).isDone()); + + InOrder inOrder = inOrder(mockLoginContext); + inOrder.verify(mockLoginContext).login(); + for (int i = 0; i < numExpectedRefreshes; ++i) { + inOrder.verify(mockLoginContext).login(); + inOrder.verify(mockLoginContext).logout(); + } } private static List<KafkaFutureImpl<Long>> addWaiters(MockScheduler mockScheduler, long refreshEveryMillis, diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index b258a342164..d5029b6ed2f 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -17,9 +17,8 @@ package org.apache.kafka.common.utils; import org.apache.kafka.test.TestUtils; -import org.easymock.EasyMock; -import org.easymock.IAnswer; import org.junit.Test; +import org.mockito.stubbing.OngoingStubbing; import java.io.Closeable; import java.io.DataOutputStream; @@ -34,6 +33,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.kafka.common.utils.Utils.formatAddress; import static org.apache.kafka.common.utils.Utils.formatBytes; @@ -45,6 +46,12 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class UtilsTest { @@ -324,17 +331,15 @@ public void testReadFullyOrFailWithRealFile() throws IOException { */ @Test public void testReadFullyOrFailWithPartialFileChannelReads() throws IOException { - FileChannel channelMock = EasyMock.createMock(FileChannel.class); + FileChannel channelMock = mock(FileChannel.class); final int bufferSize = 100; ByteBuffer buffer = ByteBuffer.allocate(bufferSize); - StringBuilder expectedBufferContent = new StringBuilder(); - fileChannelMockExpectReadWithRandomBytes(channelMock, expectedBufferContent, bufferSize); - EasyMock.replay(channelMock); + String expectedBufferContent = fileChannelMockExpectReadWithRandomBytes(channelMock, bufferSize); Utils.readFullyOrFail(channelMock, buffer, 0L, "test"); - assertEquals("The buffer should be populated correctly", expectedBufferContent.toString(), + assertEquals("The buffer should be populated correctly", expectedBufferContent, new String(buffer.array())); assertFalse("The buffer should be filled", buffer.hasRemaining()); - EasyMock.verify(channelMock); + verify(channelMock, atLeastOnce()).read(any(), anyLong()); } /** @@ -343,73 +348,62 @@ public void testReadFullyOrFailWithPartialFileChannelReads() throws IOException */ @Test public void testReadFullyWithPartialFileChannelReads() throws IOException { - FileChannel channelMock = EasyMock.createMock(FileChannel.class); + FileChannel channelMock = mock(FileChannel.class); final int bufferSize = 100; - StringBuilder expectedBufferContent = new StringBuilder(); - fileChannelMockExpectReadWithRandomBytes(channelMock, expectedBufferContent, bufferSize); - EasyMock.replay(channelMock); + String expectedBufferContent = fileChannelMockExpectReadWithRandomBytes(channelMock, bufferSize); ByteBuffer buffer = ByteBuffer.allocate(bufferSize); Utils.readFully(channelMock, buffer, 0L); - assertEquals("The buffer should be populated correctly.", expectedBufferContent.toString(), + assertEquals("The buffer should be populated correctly.", expectedBufferContent, new String(buffer.array())); assertFalse("The buffer should be filled", buffer.hasRemaining()); - EasyMock.verify(channelMock); + verify(channelMock, atLeastOnce()).read(any(), anyLong()); } @Test public void testReadFullyIfEofIsReached() throws IOException { - final FileChannel channelMock = EasyMock.createMock(FileChannel.class); + final FileChannel channelMock = mock(FileChannel.class); final int bufferSize = 100; final String fileChannelContent = "abcdefghkl"; ByteBuffer buffer = ByteBuffer.allocate(bufferSize); - EasyMock.expect(channelMock.size()).andReturn((long) fileChannelContent.length()); - EasyMock.expect(channelMock.read(EasyMock.anyObject(ByteBuffer.class), EasyMock.anyInt())).andAnswer(new IAnswer<Integer>() { - @Override - public Integer answer() { - ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; - buffer.put(fileChannelContent.getBytes()); - return -1; - } + when(channelMock.read(any(), anyLong())).then(invocation -> { + ByteBuffer bufferArg = invocation.getArgument(0); + bufferArg.put(fileChannelContent.getBytes()); + return -1; }); - EasyMock.replay(channelMock); Utils.readFully(channelMock, buffer, 0L); assertEquals("abcdefghkl", new String(buffer.array(), 0, buffer.position())); - assertEquals(buffer.position(), channelMock.size()); + assertEquals(fileChannelContent.length(), buffer.position()); assertTrue(buffer.hasRemaining()); - EasyMock.verify(channelMock); + verify(channelMock, atLeastOnce()).read(any(), anyLong()); } /** * Expectation setter for multiple reads where each one reads random bytes to the buffer. * * @param channelMock The mocked FileChannel object - * @param expectedBufferContent buffer that will be updated to contain the expected buffer content after each - * `FileChannel.read` invocation * @param bufferSize The buffer size + * @return Expected buffer string * @throws IOException If an I/O error occurs */ - private void fileChannelMockExpectReadWithRandomBytes(final FileChannel channelMock, - final StringBuilder expectedBufferContent, - final int bufferSize) throws IOException { + private String fileChannelMockExpectReadWithRandomBytes(final FileChannel channelMock, + final int bufferSize) throws IOException { final int step = 20; final Random random = new Random(); int remainingBytes = bufferSize; + OngoingStubbing<Integer> when = when(channelMock.read(any(), anyLong())); + StringBuilder expectedBufferContent = new StringBuilder(); while (remainingBytes > 0) { - final int mockedBytesRead = remainingBytes < step ? remainingBytes : random.nextInt(step); - final StringBuilder sb = new StringBuilder(); - EasyMock.expect(channelMock.read(EasyMock.anyObject(ByteBuffer.class), EasyMock.anyInt())).andAnswer(new IAnswer<Integer>() { - @Override - public Integer answer() { - ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; - for (int i = 0; i < mockedBytesRead; i++) - sb.append("a"); - buffer.put(sb.toString().getBytes()); - expectedBufferContent.append(sb); - return mockedBytesRead; - } + final int bytesRead = remainingBytes < step ? remainingBytes : random.nextInt(step); + final String stringRead = IntStream.range(0, bytesRead).mapToObj(i -> "a").collect(Collectors.joining()); + expectedBufferContent.append(stringRead); + when = when.then(invocation -> { + ByteBuffer buffer = invocation.getArgument(0); + buffer.put(stringRead.getBytes()); + return bytesRead; }); - remainingBytes -= mockedBytesRead; + remainingBytes -= bytesRead; } + return expectedBufferContent.toString(); } private static class TestCloseable implements Closeable { diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 23fc68aaba5..9ebf9e2261d 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -74,6 +74,7 @@ versions += [ lz4: "1.5.0", mavenArtifact: "3.5.4", metrics: "2.2.0", + mockito: "2.23.0", // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta powermock: "2.0.0-beta.5", reflections: "0.9.11", @@ -125,6 +126,7 @@ libs += [ log4j: "log4j:log4j:$versions.log4j", lz4: "org.lz4:lz4-java:$versions.lz4", metrics: "com.yammer.metrics:metrics-core:$versions.metrics", + mockitoCore: "org.mockito:mockito-core:$versions.mockito", powermockJunit4: "org.powermock:powermock-module-junit4:$versions.powermock", powermockEasymock: "org.powermock:powermock-api-easymock:$versions.powermock", reflections: "org.reflections:reflections:$versions.reflections", ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Replace EasyMock and PowerMock with Mockito in clients module > ------------------------------------------------------------- > > Key: KAFKA-7439 > URL: https://issues.apache.org/jira/browse/KAFKA-7439 > Project: Kafka > Issue Type: Sub-task > Reporter: Ismael Juma > Assignee: Ismael Juma > Priority: Major > > See parent task for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)