When using camel-kafka, if the processor / synchronous to (like direct, etc) take too long (>30s) to process, it will cause the kafka re-balanced.
The issue is caused by the kafka client send heart beat to broker in the poll function. After receive the ConsumerRecord from poll, there is no separate thread to send heart beat to broker. If the processor process the exchange too long and cause the next poll happen > 30s later, the kafka client will be kicked off and rebalance occur. This may make the route never success. If multiple ConsumerRecords return from a single poll and the total time to process the records > 30s, it will happen too. Suggested to use a separate thread to process the exchange and pause the Kafka client if needed. Example: Future future = processorExecutor.submit(new Runnable() { @Override public void run() { try { processor.process(exchange); } catch (Exception e) { getExceptionHandler().handleException("Error during processing", exchange, e); } } }); while(!processFinish){ try{ // Wait at most 25s LOG.debug("Waiting process to finish"); processorFuture.get(25000 - (System.currentTimeMillis() - lastPollTime), TimeUnit.MILLISECONDS); processFinish = true; }catch(TimeoutException e){ if (consumerFetcherStatus){ LOG.info("Pause consumer"); consumer.pause((TopicPartition[]) consumer.assignment().toArray(new TopicPartition[consumer.assignment().size()])); consumerFetcherStatus = false; } LOG.info("Call poll(1) to make sure kafka client keep alive"); records = consumer.poll(1); lastPollTime = System.currentTimeMillis(); if (records.count() > 0){ LOG.info("There should be nothing, but may be some partition arrive on rebalance, try pause again"); consumer.pause((TopicPartition[]) consumer.assignment().toArray(new TopicPartition[consumer.assignment().size()])); putToList(records); } } catch (InterruptedException|ExecutionException e) { LOG.info("Encounter exception on waiting processor thread", e); } } Also please note the rebalance on Kafka. When rebalance happen, the assigned topic partition will all get revoked and any "not yet processe" message should be discard as those messages' topic partition may re-assign to other client. -- View this message in context: http://camel.465427.n5.nabble.com/Camel-kafka-error-on-too-long-on-synchronous-invocation-tp5782821.html Sent from the Camel - Users mailing list archive at Nabble.com.