This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new f15678e  MINOR: Make MockClient#poll() more thread-safe (#5942)
f15678e is described below

commit f15678e203b0efb2ece45941238e3557d225bd67
Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com>
AuthorDate: Thu Feb 14 17:59:36 2019 +0000

    MINOR: Make MockClient#poll() more thread-safe (#5942)
    
    It used to preallocate an array of responses and then complete each 
response from the original collection sequentially. The problem was that the 
original collection could have been modified (another thread completing the 
response) while this was hapenning
---
 clients/src/test/java/org/apache/kafka/clients/MockClient.java        | 3 ++-
 .../java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java   | 4 ++++
 2 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 11822ac..eb25836 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -280,7 +280,6 @@ public class MockClient implements KafkaClient {
         maybeAwaitWakeup();
         checkTimeoutOfPendingRequests(now);
 
-        List<ClientResponse> copy = new ArrayList<>(this.responses);
         if (metadata != null && metadata.updateRequested()) {
             MetadataUpdate metadataUpdate = metadataUpdates.poll();
             if (cluster != null)
@@ -299,9 +298,11 @@ public class MockClient implements KafkaClient {
             }
         }
 
+        List<ClientResponse> copy = new ArrayList<>();
         ClientResponse response;
         while ((response = this.responses.poll()) != null) {
             response.onComplete();
+            copy.add(response);
         }
 
         return copy;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index e77b48b..5ec4d18 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -33,6 +33,10 @@ import java.util.Map;
  * To use in a test, create an instance and prepare its {@link #kafkaClient() 
MockClient} with the expected responses
  * for the {@link AdminClient}. Then, use the {@link #adminClient() 
AdminClient} in the test, which will then use the MockClient
  * and receive the responses you provided.
+ *
+ * Since {@link #kafkaClient() MockClient} is not thread-safe,
+ * users should be wary of calling its methods after the {@link #adminClient() 
AdminClient} is instantiated.
+ *
  * <p>
  * When finished, be sure to {@link #close() close} the environment object.
  */

Reply via email to