This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new f15678e MINOR: Make MockClient#poll() more thread-safe (#5942) f15678e is described below commit f15678e203b0efb2ece45941238e3557d225bd67 Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com> AuthorDate: Thu Feb 14 17:59:36 2019 +0000 MINOR: Make MockClient#poll() more thread-safe (#5942) It used to preallocate an array of responses and then complete each response from the original collection sequentially. The problem was that the original collection could have been modified (another thread completing the response) while this was hapenning --- clients/src/test/java/org/apache/kafka/clients/MockClient.java | 3 ++- .../java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 11822ac..eb25836 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -280,7 +280,6 @@ public class MockClient implements KafkaClient { maybeAwaitWakeup(); checkTimeoutOfPendingRequests(now); - List<ClientResponse> copy = new ArrayList<>(this.responses); if (metadata != null && metadata.updateRequested()) { MetadataUpdate metadataUpdate = metadataUpdates.poll(); if (cluster != null) @@ -299,9 +298,11 @@ public class MockClient implements KafkaClient { } } + List<ClientResponse> copy = new ArrayList<>(); ClientResponse response; while ((response = this.responses.poll()) != null) { response.onComplete(); + copy.add(response); } return copy; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java index e77b48b..5ec4d18 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java @@ -33,6 +33,10 @@ import java.util.Map; * To use in a test, create an instance and prepare its {@link #kafkaClient() MockClient} with the expected responses * for the {@link AdminClient}. Then, use the {@link #adminClient() AdminClient} in the test, which will then use the MockClient * and receive the responses you provided. + * + * Since {@link #kafkaClient() MockClient} is not thread-safe, + * users should be wary of calling its methods after the {@link #adminClient() AdminClient} is instantiated. + * * <p> * When finished, be sure to {@link #close() close} the environment object. */