[ 
https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16615242#comment-16615242
 ] 

ASF GitHub Bot commented on KAFKA-7280:
---------------------------------------

hachikuji closed pull request #5495: KAFKA-7280: Synchronize consumer fetch 
request/response handling
URL: https://github.com/apache/kafka/pull/5495
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index c84dd6f9b7b..86ee74ef80d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -93,7 +93,21 @@
 import static 
org.apache.kafka.common.serialization.ExtendedDeserializer.Wrapper.ensureExtended;
 
 /**
- * This class manage the fetching process with the brokers.
+ * This class manages the fetching process with the brokers.
+ * <p>
+ * Thread-safety:
+ * Requests and responses of Fetcher may be processed by different threads 
since heartbeat
+ * thread may process responses. Other operations are single-threaded and 
invoked only from
+ * the thread polling the consumer.
+ * <ul>
+ *     <li>If a response handler accesses any shared state of the Fetcher 
(e.g. FetchSessionHandler),
+ *     all access to that state must be synchronized on the Fetcher 
instance.</li>
+ *     <li>If a response handler accesses any shared state of the coordinator 
(e.g. SubscriptionState),
+ *     it is assumed that all access to that state is synchronized on the 
coordinator instance by
+ *     the caller.</li>
+ *     <li>Responses that collate partial responses from multiple brokers 
(e.g. to list offsets) are
+ *     synchronized on the response future.</li>
+ * </ul>
  */
 public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     private final Logger log;
@@ -191,7 +205,7 @@ public boolean hasCompletedFetches() {
      * an in-flight fetch or pending fetch data.
      * @return number of fetches sent
      */
-    public int sendFetches() {
+    public synchronized int sendFetches() {
         Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = 
prepareFetchRequests();
         for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : 
fetchRequestMap.entrySet()) {
             final Node fetchTarget = entry.getKey();
@@ -209,39 +223,43 @@ public int sendFetches() {
                     .addListener(new RequestFutureListener<ClientResponse>() {
                         @Override
                         public void onSuccess(ClientResponse resp) {
-                            FetchResponse<Records> response = 
(FetchResponse<Records>) resp.responseBody();
-                            FetchSessionHandler handler = 
sessionHandlers.get(fetchTarget.id());
-                            if (handler == null) {
-                                log.error("Unable to find FetchSessionHandler 
for node {}. Ignoring fetch response.",
-                                    fetchTarget.id());
-                                return;
+                            synchronized (Fetcher.this) {
+                                FetchResponse<Records> response = 
(FetchResponse<Records>) resp.responseBody();
+                                FetchSessionHandler handler = 
sessionHandler(fetchTarget.id());
+                                if (handler == null) {
+                                    log.error("Unable to find 
FetchSessionHandler for node {}. Ignoring fetch response.",
+                                            fetchTarget.id());
+                                    return;
+                                }
+                                if (!handler.handleResponse(response)) {
+                                    return;
+                                }
+
+                                Set<TopicPartition> partitions = new 
HashSet<>(response.responseData().keySet());
+                                FetchResponseMetricAggregator metricAggregator 
= new FetchResponseMetricAggregator(sensors, partitions);
+
+                                for (Map.Entry<TopicPartition, 
FetchResponse.PartitionData<Records>> entry : 
response.responseData().entrySet()) {
+                                    TopicPartition partition = entry.getKey();
+                                    long fetchOffset = 
data.sessionPartitions().get(partition).fetchOffset;
+                                    FetchResponse.PartitionData fetchData = 
entry.getValue();
+
+                                    log.debug("Fetch {} at offset {} for 
partition {} returned fetch data {}",
+                                            isolationLevel, fetchOffset, 
partition, fetchData);
+                                    completedFetches.add(new 
CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
+                                            
resp.requestHeader().apiVersion()));
+                                }
+
+                                
sensors.fetchLatency.record(resp.requestLatencyMs());
                             }
-                            if (!handler.handleResponse(response)) {
-                                return;
-                            }
-
-                            Set<TopicPartition> partitions = new 
HashSet<>(response.responseData().keySet());
-                            FetchResponseMetricAggregator metricAggregator = 
new FetchResponseMetricAggregator(sensors, partitions);
-
-                            for (Map.Entry<TopicPartition, 
FetchResponse.PartitionData<Records>> entry : 
response.responseData().entrySet()) {
-                                TopicPartition partition = entry.getKey();
-                                long fetchOffset = 
data.sessionPartitions().get(partition).fetchOffset;
-                                FetchResponse.PartitionData fetchData = 
entry.getValue();
-
-                                log.debug("Fetch {} at offset {} for partition 
{} returned fetch data {}",
-                                        isolationLevel, fetchOffset, 
partition, fetchData);
-                                completedFetches.add(new 
CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
-                                        resp.requestHeader().apiVersion()));
-                            }
-
-                            
sensors.fetchLatency.record(resp.requestLatencyMs());
                         }
 
                         @Override
                         public void onFailure(RuntimeException e) {
-                            FetchSessionHandler handler = 
sessionHandlers.get(fetchTarget.id());
-                            if (handler != null) {
-                                handler.handleError(e);
+                            synchronized (Fetcher.this) {
+                                FetchSessionHandler handler = 
sessionHandler(fetchTarget.id());
+                                if (handler != null) {
+                                    handler.handleError(e);
+                                }
                             }
                         }
                     });
@@ -864,7 +882,7 @@ public ListOffsetResult() {
                 // if there is a leader and no in-flight requests, issue a new 
fetch
                 FetchSessionHandler.Builder builder = fetchable.get(node);
                 if (builder == null) {
-                    FetchSessionHandler handler = 
sessionHandlers.get(node.id());
+                    FetchSessionHandler handler = sessionHandler(node.id());
                     if (handler == null) {
                         handler = new FetchSessionHandler(logContext, 
node.id());
                         sessionHandlers.put(node.id(), handler);
@@ -1060,6 +1078,11 @@ public void 
clearBufferedDataForUnassignedTopics(Collection<String> assignedTopi
         clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
     }
 
+    // Visibilty for testing
+    protected FetchSessionHandler sessionHandler(int node) {
+        return sessionHandlers.get(node);
+    }
+
     public static Sensor throttleTimeSensor(Metrics metrics, 
FetcherMetricsRegistry metricsRegistry) {
         Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
         
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg),
 new Avg());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 3bf3deba1aa..1ca0fde2698 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.FetchSessionHandler;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NetworkClient;
@@ -88,6 +89,7 @@
 import org.junit.Test;
 
 import java.io.DataOutputStream;
+import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -101,6 +103,13 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.singleton;
 import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
@@ -112,6 +121,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+
 @SuppressWarnings("deprecation")
 public class FetcherTest {
     private ConsumerRebalanceListener listener = new 
NoOpConsumerRebalanceListener();
@@ -149,38 +159,30 @@
     private Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, 
metrics);
     private Metrics fetcherMetrics = new Metrics(time);
     private Fetcher<byte[], byte[]> fetcherNoAutoReset = 
createFetcher(subscriptionsNoAutoReset, fetcherMetrics);
+    private ExecutorService executorService;
 
     @Before
     public void setup() throws Exception {
         metadata.update(cluster, Collections.<String>emptySet(), 
time.milliseconds());
         client.setNode(node);
 
-        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, 
TimestampType.CREATE_TIME, 1L);
-        builder.append(0L, "key".getBytes(), "value-1".getBytes());
-        builder.append(0L, "key".getBytes(), "value-2".getBytes());
-        builder.append(0L, "key".getBytes(), "value-3".getBytes());
-        records = builder.build();
-
-        builder = MemoryRecords.builder(ByteBuffer.allocate(1024), 
CompressionType.NONE, TimestampType.CREATE_TIME, 4L);
-        builder.append(0L, "key".getBytes(), "value-4".getBytes());
-        builder.append(0L, "key".getBytes(), "value-5".getBytes());
-        nextRecords = builder.build();
-
-        builder = MemoryRecords.builder(ByteBuffer.allocate(1024), 
CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
-        emptyRecords = builder.build();
-
-        builder = MemoryRecords.builder(ByteBuffer.allocate(1024), 
CompressionType.NONE, TimestampType.CREATE_TIME, 4L);
-        builder.append(0L, "key".getBytes(), "value-0".getBytes());
-        partialRecords = builder.build();
+        records = buildRecords(1L, 3, 1);
+        nextRecords = buildRecords(4L, 2, 4);
+        emptyRecords = buildRecords(0L, 0, 0);
+        partialRecords = buildRecords(4L, 1, 0);
         partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000);
     }
 
     @After
-    public void teardown() {
+    public void teardown() throws Exception {
         this.metrics.close();
         this.fetcherMetrics.close();
         this.fetcher.close();
         this.fetcherMetrics.close();
+        if (executorService != null) {
+            executorService.shutdownNow();
+            assertTrue(executorService.awaitTermination(5, TimeUnit.SECONDS));
+        }
     }
 
     @Test
@@ -2509,6 +2511,141 @@ public void testConsumingViaIncrementalFetchRequests() {
         assertEquals(5, records.get(1).offset());
     }
 
+    @Test
+    public void testFetcherConcurrency() throws Exception {
+        int numPartitions = 20;
+        Set<TopicPartition> topicPartitions = new HashSet<>();
+        for (int i = 0; i < numPartitions; i++)
+            topicPartitions.add(new TopicPartition(topicName, i));
+        cluster = TestUtils.singletonCluster(topicName, numPartitions);
+        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
+        client.setNode(node);
+        fetchSize = 10000;
+
+        Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(
+                new LogContext(),
+                consumerClient,
+                minBytes,
+                maxBytes,
+                maxWaitMs,
+                fetchSize,
+                2 * numPartitions,
+                true,
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(),
+                metadata,
+                subscriptions,
+                metrics,
+                metricsRegistry,
+                time,
+                retryBackoffMs,
+                requestTimeoutMs,
+                IsolationLevel.READ_UNCOMMITTED) {
+            @Override
+            protected FetchSessionHandler sessionHandler(int id) {
+                final FetchSessionHandler handler = super.sessionHandler(id);
+                if (handler == null)
+                    return null;
+                else {
+                    return new FetchSessionHandler(new LogContext(), id) {
+                        @Override
+                        public Builder newBuilder() {
+                            verifySessionPartitions();
+                            return handler.newBuilder();
+                        }
+
+                        @Override
+                        public boolean handleResponse(FetchResponse response) {
+                            verifySessionPartitions();
+                            return handler.handleResponse(response);
+                        }
+
+                        @Override
+                        public void handleError(Throwable t) {
+                            verifySessionPartitions();
+                            handler.handleError(t);
+                        }
+
+                        // Verify that session partitions can be traversed 
safely.
+                        private void verifySessionPartitions() {
+                            try {
+                                Field field = 
FetchSessionHandler.class.getDeclaredField("sessionPartitions");
+                                field.setAccessible(true);
+                                LinkedHashMap<TopicPartition, 
FetchRequest.PartitionData> sessionPartitions =
+                                        (LinkedHashMap<TopicPartition, 
FetchRequest.PartitionData>) field.get(handler);
+                                for (Map.Entry<TopicPartition, 
FetchRequest.PartitionData> entry : sessionPartitions.entrySet()) {
+                                    // If `sessionPartitions` are modified on 
another thread, Thread.yield will increase the
+                                    // possibility of 
ConcurrentModificationException if appropriate synchronization is not used.
+                                    Thread.yield();
+                                }
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                    };
+                }
+            }
+        };
+
+        subscriptions.assignFromUser(topicPartitions);
+        topicPartitions.forEach(tp -> subscriptions.seek(tp, 0L));
+
+        AtomicInteger fetchesRemaining = new AtomicInteger(1000);
+        executorService = Executors.newSingleThreadExecutor();
+        Future<?> future = executorService.submit(() -> {
+            while (fetchesRemaining.get() > 0) {
+                synchronized (consumerClient) {
+                    if (!client.requests().isEmpty()) {
+                        ClientRequest request = client.requests().peek();
+                        FetchRequest fetchRequest = (FetchRequest) 
request.requestBuilder().build();
+                        LinkedHashMap<TopicPartition, 
FetchResponse.PartitionData<MemoryRecords>> responseMap = new LinkedHashMap<>();
+                        for (Map.Entry<TopicPartition, 
FetchRequest.PartitionData> entry : fetchRequest.fetchData().entrySet()) {
+                            TopicPartition tp = entry.getKey();
+                            long offset = entry.getValue().fetchOffset;
+                            responseMap.put(tp, new 
FetchResponse.PartitionData<>(Errors.NONE, offset + 2L, offset + 2,
+                                    0L, null, buildRecords(offset, 2, 
offset)));
+                        }
+                        client.respondToRequest(request, new 
FetchResponse<>(Errors.NONE, responseMap, 0, 123));
+                        consumerClient.poll(time.timer(0));
+                    }
+                }
+            }
+            return fetchesRemaining.get();
+        });
+        Map<TopicPartition, Long> nextFetchOffsets = topicPartitions.stream()
+                .collect(Collectors.toMap(Function.identity(), t -> 0L));
+        while (fetchesRemaining.get() > 0 && !future.isDone()) {
+            if (fetcher.sendFetches() == 1) {
+                synchronized (consumerClient) {
+                    consumerClient.poll(time.timer(0));
+                }
+            }
+            if (fetcher.hasCompletedFetches()) {
+                Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetcher.fetchedRecords();
+                if (!fetchedRecords.isEmpty()) {
+                    fetchesRemaining.decrementAndGet();
+                    fetchedRecords.entrySet().forEach(entry -> {
+                        TopicPartition tp = entry.getKey();
+                        List<ConsumerRecord<byte[], byte[]>> records = 
entry.getValue();
+                        assertEquals(2, records.size());
+                        long nextOffset = nextFetchOffsets.get(tp);
+                        assertEquals(nextOffset, records.get(0).offset());
+                        assertEquals(nextOffset + 1, records.get(1).offset());
+                        nextFetchOffsets.put(tp, nextOffset + 2);
+                    });
+                }
+            }
+        }
+        assertEquals(0, future.get());
+    }
+
+    private MemoryRecords buildRecords(long baseOffset, int count, long 
firstMessageId) {
+        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, 
TimestampType.CREATE_TIME, baseOffset);
+        for (int i = 0; i < count; i++)
+            builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + 
i)).getBytes());
+        return builder.build();
+    }
+
     private int appendTransactionalRecords(ByteBuffer buffer, long pid, long 
baseOffset, int baseSequence, SimpleRecord... records) {
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
                 TimestampType.CREATE_TIME, baseOffset, time.milliseconds(), 
pid, (short) 0, baseSequence, true,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ConcurrentModificationException in FetchSessionHandler in heartbeat thread
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-7280
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7280
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 1.1.1, 2.0.0
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Critical
>             Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Request/response handling in FetchSessionHandler is not thread-safe. But we 
> are using it in Kafka consumer without any synchronization even though poll() 
> from heartbeat thread can process responses. Heartbeat thread holds the 
> coordinator lock while processing its poll and responses, making other 
> operations involving the group coordinator safe. We also need to lock 
> FetchSessionHandler for the operations that update or read 
> FetchSessionHandler#sessionPartitions.
> This exception is from a system test run on trunk of 
> TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two:
> {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, 
> groupId=group] Heartbeat thread failed due to unexpected error 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  java.util.ConcurrentModificationException
>  at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
>  at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996)
> {quote}
>  
> The logs just prior to the exception show that a partition was removed from 
> the session:
> {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Skipping fetch for partition test_topic-1 because there is an 
> in-flight request to worker4:9095 (id: 3 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Completed receive from node 2 for FETCH with correlation id 
> 417, received 
> {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header=
> Unknown macro: 
> \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null}
> ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 
> bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Added READ_UNCOMMITTED fetch request for partition 
> test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for 
> node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) 
> (org.apache.kafka.clients.FetchSessionHandler)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), 
> toForget=(test_topic-2), implied=(test_topic-0)) to broker worker3:9095 (id: 
> 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Sending FETCH 
> {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=109800960,epoch=237,topics=[],forgotten_topics_data=[
> Unknown macro: \{topic=test_topic,partitions=[2]}
> ]} with correlation id 418 to node 2 (org.apache.kafka.clients.NetworkClient)
>  [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Skipping fetch for partition test_topic-2 because there is an 
> in-flight request to worker3:9095 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, 
> groupId=group] Heartbeat thread failed due to unexpected error 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  java.util.ConcurrentModificationException
> {quote}
> The sequence in the logs show
>  # FETCH response received
>  # FetchSessionHandler#sessionPartitions is updated (a partition is removed)
>  # New FETCH request is sent
>  # Heartbeat thread throws ConcurrentModificationException while iterating 
> over FetchSessionHandler#sessionPartitions
> This could be because 1) and 4) were on the heartbeat thread and 2) and 3) on 
> the thread processing Consumer#poll().
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to