This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 0acab27  MINOR: Make MockClient#poll() more thread-safe (#5942)
0acab27 is described below

commit 0acab27b3b09a5ae3b8d22f86943b1cf02a3dea9
Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com>
AuthorDate: Thu Feb 14 17:59:36 2019 +0000

    MINOR: Make MockClient#poll() more thread-safe (#5942)
    
    It used to preallocate an array of responses and then complete each 
response from the original collection sequentially. The problem was that the 
original collection could have been modified (another thread completing the 
response) while this was hapenning
---
 clients/src/test/java/org/apache/kafka/clients/MockClient.java        | 4 ++--
 .../java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java   | 4 ++++
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 70ffeae..4bd5e54 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -280,8 +280,6 @@ public class MockClient implements KafkaClient {
         maybeAwaitWakeup();
         checkTimeoutOfPendingRequests(now);
 
-        List<ClientResponse> copy = new ArrayList<>(this.responses);
-
         if (metadata != null && metadata.updateRequested()) {
             MetadataUpdate metadataUpdate = metadataUpdates.poll();
             if (cluster != null)
@@ -300,9 +298,11 @@ public class MockClient implements KafkaClient {
             }
         }
 
+        List<ClientResponse> copy = new ArrayList<>();
         ClientResponse response;
         while ((response = this.responses.poll()) != null) {
             response.onComplete();
+            copy.add(response);
         }
 
         return copy;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index e77b48b..5ec4d18 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -33,6 +33,10 @@ import java.util.Map;
  * To use in a test, create an instance and prepare its {@link #kafkaClient() 
MockClient} with the expected responses
  * for the {@link AdminClient}. Then, use the {@link #adminClient() 
AdminClient} in the test, which will then use the MockClient
  * and receive the responses you provided.
+ *
+ * Since {@link #kafkaClient() MockClient} is not thread-safe,
+ * users should be wary of calling its methods after the {@link #adminClient() 
AdminClient} is instantiated.
+ *
  * <p>
  * When finished, be sure to {@link #close() close} the environment object.
  */

Reply via email to