When using camel-kafka, if the processor / synchronous to (like direct, etc)
take too long (>30s) to process, it will cause the kafka re-balanced.

The issue is caused by the kafka client send heart beat to broker in the
poll function. After receive the ConsumerRecord from poll, there is no
separate thread to send heart beat to broker. If the processor process the
exchange too long and cause the next poll happen > 30s later, the kafka
client will be kicked off and rebalance occur. This may make the route never
success.

If multiple ConsumerRecords return from a single poll and the total time to
process the records > 30s, it will happen too.

Suggested to use a separate thread to process the exchange and pause the
Kafka client if needed.

Example:
Future future = processorExecutor.submit(new Runnable() {
                                                        @Override
                                                        public void run() {
                                                                try {
                                                                        
processor.process(exchange);
                                                                } catch 
(Exception e) {
                                                                        
getExceptionHandler().handleException("Error during processing",
exchange, e);
                                                                }
                                                        }
                                                });

while(!processFinish){
                                                        try{
                                                                // Wait at most 
25s
                                                                
LOG.debug("Waiting process to finish");
                                                                
processorFuture.get(25000 - (System.currentTimeMillis() -
lastPollTime), TimeUnit.MILLISECONDS);
                                                                processFinish = 
true;                                                           
                                                        }catch(TimeoutException 
e){
                                                                if 
(consumerFetcherStatus){
                                                                        
LOG.info("Pause consumer");
                                                                        
consumer.pause((TopicPartition[]) consumer.assignment().toArray(new
TopicPartition[consumer.assignment().size()]));
                                                                        
consumerFetcherStatus = false;
                                                                }
                                                                
                                                                LOG.info("Call 
poll(1) to make sure kafka client keep alive");
                                                                records = 
consumer.poll(1);
                                                                lastPollTime = 
System.currentTimeMillis();
                                                                if 
(records.count() > 0){
                                                                        
LOG.info("There should be nothing, but may be some partition arrive
on rebalance, try pause again");
                                                                        
consumer.pause((TopicPartition[]) consumer.assignment().toArray(new
TopicPartition[consumer.assignment().size()]));
                                                                        
putToList(records);
                                                                }
                                                        } catch 
(InterruptedException|ExecutionException e) {
                                                                
LOG.info("Encounter exception on waiting processor thread", e);
                                                        }
                                                }


Also please note the rebalance on Kafka. When rebalance happen, the assigned
topic partition will all get revoked and any "not yet processe" message
should be discard as those messages' topic partition may re-assign to other
client.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Camel-kafka-error-on-too-long-on-synchronous-invocation-tp5782821.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to