[ https://issues.apache.org/jira/browse/KAFKA-13425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang updated KAFKA-13425: ---------------------------------- Labels: new-consumer-threading-should-fix (was: ) > KafkaConsumer#pause() will lose its effect after groupRebalance occurs, which > maybe cause data loss on the consumer side > ------------------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-13425 > URL: https://issues.apache.org/jira/browse/KAFKA-13425 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 3.0.0 > Reporter: RivenSun > Priority: Major > Labels: new-consumer-threading-should-fix > Attachments: architecture_picture.png > > > > > h1. Foreword: > Since I want to achieve the decoupling of the two processes of polling > messages and consuming messages on the KafkaConsumer side, I use the "poll > --> push" architecture model on the Kafka consumer side. > . > h2. Architecture > see picture "architecture_picture" > h3. 1)ThreadPoolExecutor > The key parameters of ThreadPoolExecutor threadPool are: > h4. (1) Select ArrayBlockingQueue<Runnable> for workQueue type > h4. (2) The handler uses the RejectedExecutionHandler interface > h4. (3)threadPool.allowCoreThreadTimeOut(true); > > h3. 2) KafkaConsumer > The disadvantage of this architecture is that if the business side’s > onMessage() method is time-consuming to execute, it will lead to: > h4. (1)The blockingQueue of ThreadPoolExecutor will accumulate a large number > of Tasks, and eventually the push message will fail. > h4. (2)How to deal with the KafkaConsumer poll() method when the push fails: > h5. 1. stop call poll() > KafkaConsumer needs to set > *configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE);* > In order to prevent the heartbeat thread of KafkaConumser from discovering > that KafkaConsumer does not call the poll() method for a long time, and > automatically execute *maybeLeaveGroup("consumer poll timeout has expired.")*; > But the most serious consequence of this is that once the rebalance of the > group is triggered for some reason, *the rebalance process of the entire > group will not be completed because the kafkaConsumer does not call the > poll() method. This will cause all consumers under the entire group to stop > consumption.* > h5. 2. When the push message fails, continue to maintain the poll method > The purpose is to maintain the poll method call of kafkaConsumer, but at this > time KafkaConsumer should not poll any messages, because the downstream > BlockingQueue for storing messages is full. So at this time we need the help > of KafkaConsumer#pause(...) and KafkaConsumer#resume(...). And I named this > special poll method maintainPoll4Rebalance(). > > h1. maintainPoll4Rebalance Preliminary design ideas: > code Simple design: > {code:java} > public static void main(String[] args) { > while (true) { > try { > List<Object> messages = > kafkaConsumer.poll(Duration.ofSeconds(1)); > while (!publish(message)) { > try { > maintainPoll4Rebalance(); > } catch (Exception e) { > log.error("maintain poll for rebalance with error > {}", e.getMessage(), e); > CommonUtil.sleep(TimeUnit.SECONDS, 1); > } > } > } catch (Exception e) { > log.error("KafkaConsumer poll message has error: {}", > e.getMessage(), e); > CommonUtil.sleep(TimeUnit.MILLISECONDS, > ClientConfig.CLIENT_SLEEP_INTERVAL_MS); > } > } > } > private boolean publish(Object message) { > try { > ... > threadPool.execute(() -> onMessage(message)); > } catch (RejectedExecutionException e) { > log.error("consumer execute failed with error{}", e.getMessage(), > e); > return false; > } catch (Exception e) { > log.error("consumer execute failed with error{}", e.getMessage(), > e); > return false; > } > return true; > } > private void maintainPoll4Rebalance() { > try { > kafkaConsumer.pause(kafkaConsumer.assignment()); > ConsumerRecords<String, Object> records = > kafkaConsumer.poll(Duration.ofSeconds(1)); > if (!records.isEmpty()) { > log.error("kafka poll for rebalance discard some record!"); > for (ConsumerRecord<String, Object> consumerRecord : records) > { > if (consumerRecord != null) { > log.error("this record need to retry, partition {} > ,offset {}", consumerRecord.partition(), consumerRecord.offset()); > } > } > } > } catch (Exception e) { > log.error("maintain poll for rebalance with error:{}", > e.getMessage(), e); > } finally { > kafkaConsumer.resume(kafkaConsumer.assignment()); > } > } > {code} > > > The above code maintainPoll4Rebalance() seems to be a good solution to my > problem. When downstream consumption is blocked, KafkaConsumer can maintain > the continuous call of the poll method, and it avoids that KafkaConsumer can > continue to pull messages when the push fails. > But in reality, logs will appear during operation: > {code:java} > [main] ERROR ConsumerTest3 - kafka poll for rebalance discard some record! > [main] ERROR ConsumerTest3 - this record need to retry, partition 0 ,offset > 36901 > {code} > I obviously have called kafkaConsumer.pause(kafkaConsumer.assignment()) > before kafkaConsumer#poll is called. Why does kafkaConsumer still pull the > message and cause the message to be lost? The reason for the loss is that the > consumer turned on the auto-commit offset. > > h1. RootCause Analysis > > KafkaConsumer#poll: > 1) updateAssignmentMetadataIfNeeded > 2) fetcher.fetchedRecords() > 3) fetcher.sendFetches(); > These three methods are the three most critical operations in > KafkaConsumer#poll. updateAssignmentMetadataIfNeeded is mainly responsible > for group rebalance related work. And RC appears in the first and second > steps. > > h2. 1.updateAssignmentMetadataIfNeeded > > We trace directly to ConsumerCoordinator#onJoinPrepare(...) > {code:java} > else { > switch (protocol) { > case EAGER: > // revoke all partitions > revokedPartitions = new > HashSet<>(subscriptions.assignedPartitions()); > exception = invokePartitionsRevoked(revokedPartitions); > > subscriptions.assignFromSubscribed(Collections.emptySet()); > break; > case COOPERATIVE: > // only revoke those partitions that are not in the > subscription any more. > Set<TopicPartition> ownedPartitions = new > HashSet<>(subscriptions.assignedPartitions()); > revokedPartitions = ownedPartitions.stream() > .filter(tp -> > !subscriptions.subscription().contains(tp.topic())) > .collect(Collectors.toSet()); > if (!revokedPartitions.isEmpty()) { > exception = > invokePartitionsRevoked(revokedPartitions); > ownedPartitions.removeAll(revokedPartitions); > subscriptions.assignFromSubscribed(ownedPartitions); > } > break; > } > } > {code} > > The value of the protocol instance variable here, see its initialization code > {code:java} > // select the rebalance protocol such that: > // 1. only consider protocols that are supported by all the > assignors. If there is no common protocols supported > // across all the assignors, throw an exception. > // 2. if there are multiple protocols that are commonly supported, > select the one with the highest id (i.e. the > // id number indicates how advanced the protocol is). > // we know there are at least one assignor in the list, no need to > double check for NPE > if (!assignors.isEmpty()) { > List<RebalanceProtocol> supportedProtocols = new > ArrayList<>(assignors.get(0).supportedProtocols()); > for (ConsumerPartitionAssignor assignor : assignors) { > supportedProtocols.retainAll(assignor.supportedProtocols()); > } > if (supportedProtocols.isEmpty()) { > throw new IllegalArgumentException("Specified assignors " + > > assignors.stream().map(ConsumerPartitionAssignor::name).collect(Collectors.toSet()) > + > " do not have commonly supported rebalance protocol"); > } > Collections.sort(supportedProtocols); > protocol = supportedProtocols.get(supportedProtocols.size() - 1); > } else { > protocol = null; > } > {code} > > After a simple analysis, we can understand that as long as supportedProtocols > contains the RebalanceProtocol.COOPERATIVE element, the protocol value will > be COOPERATIVE, otherwise it will be EAGER. > But to check the ConsumerPartitionAssignor interface, I found that all its > implementation classes except CooperativeStickyAssignor, all other > PartitionAssignor implementation classes have adopted default values > {code:java} > Indicate which rebalance protocol this assignor works with; By default it > should always work with ConsumerPartitionAssignor.RebalanceProtocol.EAGER. > default List<RebalanceProtocol> supportedProtocols() { > return Collections.singletonList(RebalanceProtocol.EAGER); > } > {code} > > So the code will run to > {code:java} > case EAGER: > // revoke all partitions > revokedPartitions = new > HashSet<>(subscriptions.assignedPartitions()); > exception = invokePartitionsRevoked(revokedPartitions); > > subscriptions.assignFromSubscribed(Collections.emptySet()); > break; > {code} > > > The problem is here, > *subscriptions.assignFromSubscribed(Collections.emptySet()) will clear the > assignment in my subscriptions, and then {color:#ff0000}clear the paused mark > for TopicPartition.{color}* > h2. 2.fetcher.fetchedRecords() > There is no need to go into the code here, fetchedRecords will verify the > corresponding TopicPartition of each message set CompletedFetch in memory > h3. 1)if (subscriptions.isPaused(nextInLineFetch.partition)) > h3. 2)if (!subscriptions.isAssigned(completedFetch.partition)) > h3. 3)if (!subscriptions.isFetchable(completedFetch.partition)) > > The problem is: If within the pollTimer specified by the user, a poll(...) > call completes the updateAssignmentMetadataIfNeeded operation, the > updateAssignmentMetadataIfNeeded method returns true, and the paused flag for > TopicPartition has also been cleared in updateAssignmentMetadataIfNeeded, and > the new assignments of kafkaConsumer still hold this TopicPartition after the > rebalance is completed. Then the verification of TopicPartition mentioned > above will pass. > And the *{color:#ff0000}nextInLineFetch variable in KafkaConsumer memory > stores TopicPartition messages,{color} the KafkaConsumer#poll(...) method > will still return the message after calling pause(...). Even if you always > call pause(...) before each poll(...), it will Return the message > corresponding to TopicPartition.* > If the business side cannot process the message at this time, and the > KafkaConsumer turns on the automatic submission offset switch, the message > will be lost on the consumer side. The maximum number of lost messages > max.poll.records. > {code:java} > try { > kafkaConsumer.pause(kafkaConsumer.assignment()); > ConsumerRecords<String, String> records = > kafkaConsumer.poll(Duration.ofSeconds(5)); > > } catch (Exception e) { > log.error("maintain poll for rebalance with error:{}", > e.getMessage(), e); > } finally { > kafkaConsumer.resume(kafkaConsumer.assignment()); > } > {code} > > > > > h1. maintainPoll4Rebalance() Temporary solution > > The paused mark of TopicPartition is remedied in > ConsumerRebalanceListener#onPartitionsAssigned(...) > {code:java} > private boolean maintainPoll4Rebalance; > > private void initKafkaConsumer() { > kafkaConsumer.subscribe(topics, () -> new ConsumerRebalanceListener() > { > @Override > public void onPartitionsRevoked(Collection<TopicPartition> > partitions) { > confirmMessageSync(); > log.info("consumer on partition revoked!"); > } > @Override > public void onPartitionsAssigned(Collection<TopicPartition> > partitions) { > try { > if (maintainPoll4Rebalance) { > kafkaConsumer.pause(kafkaConsumer.assignment()); > } > } catch (Exception e) { > log.error("consumer onPartitionsAssigned failed with > error:{}!", e.getMessage(), e); > } > log.info("consumer on partition assigned!"); > } > }); > } > > > private void maintainPoll4Rebalance() { > try { > maintainPoll4Rebalance = true; > kafkaConsumer.pause(kafkaConsumer.assignment()); > ConsumerRecords<String, Object> records = > kafkaConsumer.poll(Duration.ofSeconds(1)); > if (!records.isEmpty()) { > log.error("kafka poll for rebalance discard some record!"); > for (ConsumerRecord<String, Object> consumerRecord : records) > { > if (consumerRecord != null) { > log.error("this record need to retry, partition {} > ,offset {}", consumerRecord.partition(), consumerRecord.offset()); > } > } > } > } catch (Exception e) { > log.error("maintain poll for rebalance with error:{}", > e.getMessage(), e); > } finally { > maintainPoll4Rebalance = false; > kafkaConsumer.resume(kafkaConsumer.assignment()); > } > } > {code} > > After testing, this problem can be temporarily solved. After calling > kafkaConsumer#pause(...), kafkaConsumer.poll(...) will definitely not return > the corresponding TopicPartition message. > h1. Suggestions > {{1. Precise semantics of kafkaConsumer#pause(…)}} > First look at the comments on this method > {code:java} > Suspend fetching from the requested partitions. Future calls to > poll(Duration) will not return any records from these partitions until they > have been resumed using resume(Collection). Note that this method does not > affect partition subscription. In particular, it does not cause a group > rebalance when automatic assignment is used. > Params: > partitions – The partitions which should be paused > Throws: > IllegalStateException – if any of the provided partitions are not currently > assigned to this consumer > @Override > public void pause(Collection<TopicPartition> partitions) { > acquireAndEnsureOpen(); > try { > log.debug("Pausing partitions {}", partitions); > for (TopicPartition partition: partitions) { > subscriptions.pause(partition); > } > } finally { > release(); > } > } > {code} > > We don’t know from the comments that {color:#ff0000}the pause method will > lose its function after a groupRebalance.{color} > And When cleaning the paused mark of topicPartitions, kafkaConsumer did not > output any logs, and the customer could not perceive that the pause(...) > method no longer works. > > {{2. When we execute invokePartitionsRevoked(revokedPartitions), do we > consider the need to clean up the messages in KafkaConsumer memory > corresponding to revokedPartitions?}} > If cleaned up, the cost is: After resume(...) , kafkaConsumer needs to > re-initiate FetchRequests for resumedPartitions, which brings additional > network transmission > > {{3.We better support the pause(...) method on the KafkaConsumer side that is > not affected by groupRebalance}} > 1) When rebalance starts to prepare, add new logic to > ConsumerCoordinator#onJoinPrepare(...) > Before executing invokePartitionsRevoked(...) and > subscriptions.assignFromSubscribed(...), filter out customerPausedPartitions > from the subscriptions.assignment of the current KafkaConsumer, and > customerPausedPartitions should be instance variables of ConsumerCoordinator. > {code:java} > customerPausedPartitions = subscriptions.pausedPartitions(); > //Add new code in front of the following two codes > exception = invokePartitionsRevoked(...); > subscriptions.assignFromSubscribed(...); > {code} > > 2) After the rebalance is completed, add new logic to > ConsumerCoordinator#onJoinComplete(...) > {code:java} > protected void onJoinComplete(int generation, > String memberId, > String assignmentStrategy, > ByteBuffer assignmentBuffer) { > log.debug("Executing onJoinComplete with generation {} and memberId > {}", generation, memberId); > ...... > subscriptions.assignFromSubscribed(assignedPartitions); > > //Add new code here > if (customerPausedPartitions != null && > customerPausedPartitions.size() != 0){ > customerPausedPartitions.forEach(topicPartition -> { > if(subscriptions.isAssigned(topicPartition)) > subscriptions.pause(topicPartition); > }); > customerPausedPartitions = null; > } > // Add partitions that were not previously owned but are now assigned > firstException.compareAndSet(null, > invokePartitionsAssigned(addedPartitions)); > ...... > } > {code} > The above is just a first draft of the modified code. It can only guarantee > that after a rebalance, the topicPartitions still held in the new assignment > of KafkaConsumer will maintain the paused mark. > *Note*: If the new assignment of kafkaConsumer no longer contains > topicPartitions that have been paused before rebalance, the paused mark of > these topicPartitions will be lost forever on the kafkaConsumer side, even if > in a future rebalance, the kafkaConsumer will hold these partitions again. > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)