This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push: new 0acab27 MINOR: Make MockClient#poll() more thread-safe (#5942) 0acab27 is described below commit 0acab27b3b09a5ae3b8d22f86943b1cf02a3dea9 Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com> AuthorDate: Thu Feb 14 17:59:36 2019 +0000 MINOR: Make MockClient#poll() more thread-safe (#5942) It used to preallocate an array of responses and then complete each response from the original collection sequentially. The problem was that the original collection could have been modified (another thread completing the response) while this was hapenning --- clients/src/test/java/org/apache/kafka/clients/MockClient.java | 4 ++-- .../java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 70ffeae..4bd5e54 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -280,8 +280,6 @@ public class MockClient implements KafkaClient { maybeAwaitWakeup(); checkTimeoutOfPendingRequests(now); - List<ClientResponse> copy = new ArrayList<>(this.responses); - if (metadata != null && metadata.updateRequested()) { MetadataUpdate metadataUpdate = metadataUpdates.poll(); if (cluster != null) @@ -300,9 +298,11 @@ public class MockClient implements KafkaClient { } } + List<ClientResponse> copy = new ArrayList<>(); ClientResponse response; while ((response = this.responses.poll()) != null) { response.onComplete(); + copy.add(response); } return copy; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java index e77b48b..5ec4d18 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java @@ -33,6 +33,10 @@ import java.util.Map; * To use in a test, create an instance and prepare its {@link #kafkaClient() MockClient} with the expected responses * for the {@link AdminClient}. Then, use the {@link #adminClient() AdminClient} in the test, which will then use the MockClient * and receive the responses you provided. + * + * Since {@link #kafkaClient() MockClient} is not thread-safe, + * users should be wary of calling its methods after the {@link #adminClient() AdminClient} is instantiated. + * * <p> * When finished, be sure to {@link #close() close} the environment object. */