We have a custom Camel component named CamelRedisSeda which basically is an extension of UriEndpointComponent, which pops items from or pushes items to a Redis queue. For testing both Redis client and the connection are mocked and use a blocking queue in the back as Redis queue replacement.
I test the consumer of the component using the ConsumerTemplate like this: @Test public void testRedisSedaConsumerForMultipleAvailableMessages() throws Exception { String queueName = "redis-seda:q1"; RedisConnection<String, RedisSedaPayload> con = redisClient.connect(new ObjectCodec<RedisSedaPayload>()); con.rpush(queueName, new RedisSedaPayload("test"), new RedisSedaPayload("test2"), new RedisSedaPayload("test3")); ConsumerTemplate consumerTemplate = consumer(); String body = consumerTemplate.receiveBody("redis-seda://q1?redisClient=#redisClient", 500, String.class); String body2 = consumerTemplate.receiveBody("redis-seda://q1?redisClient=#redisClient", 500, String.class); String body3 = consumerTemplate.receiveBody("redis-seda://q1?redisClient=#redisClient", 500, String.class); String body4 = consumerTemplate.receiveBody("redis-seda://q1?redisClient=#redisClient", 500, String.class); assertThat(body, is(equalTo("test"))); assertThat(body2, is(equalTo("test2"))); assertThat(body3, is(equalTo("test3"))); assertThat(body4, is(nullValue())); } which works quite well. Though, we noticed while patching one of our servers recently, which also required a restart, that one of our messages got lost during the restart. To test this behavior I wanted to create a test case which fills the mocked queue with plenty of messages and shut down the camel context while the consumer pops messages from the queue. Basically the test first creates 1000 entries which the consumer should create. I then create a worker thread which stops the context (and the consumer) after the 5th message was processed. Each pop is currently mocked with a slight sleep delay so that the whole tests needs a couple of seconds and the shutdown thread has a chance to perform its task. @Test public void testCamelShutdownWhileConsumingMessages() throws Exception { int maxItems = 1000; String queueName = "redis-seda:q1"; RedisConnection<String, RedisSedaPayload> con = redisClient.connect(new ObjectCodec<RedisSedaPayload>()); for (int i=1; i<maxItems+1; i++) { con.rpush(queueName, new RedisSedaPayload(i)); } ConsumerTemplate consumerTemplate = consumer(); CyclicBarrier barrier = new CyclicBarrier(2); camelShutdownThread(barrier).start(); Exchange exchange = null; int lastProcessedItem = 0; do { try { if (5 == lastProcessedItem) { LOG.debug("Consumer test waiting for synchronization point"); barrier.await(); } exchange = consumerTemplate.receive("redis-seda://q1?redisClient=#redisClient", 50); if (null != exchange) { lastProcessedItem = exchange.getIn().getBody(Integer.class); } } catch (Exception ex) { LOG.warn("Caught exception", ex); // extract the item number from the last processed exchange body if (null != exchange) { lastProcessedItem = exchange.getIn().getBody(Integer.class); } exchange = null; } } while (null != exchange); LOG.info("Consumed items: {}, available items: {}", lastProcessedItem, con.llen("")); assertThat(lastProcessedItem + con.llen(""), is(equalTo((long)maxItems))); } The shutdown thread invoked from within the test does look like this private Thread camelShutdownThread(final CyclicBarrier barrier) throws Exception { return new Thread(new Runnable() { @Override public void run() { try { LOG.debug("Shutdown thread waiting for synchronization point"); barrier.await(); LOG.info("Performing shutdown request"); } catch (BrokenBarrierException |InterruptedException ex) { LOG.warn("Caught exception " + ex.getLocalizedMessage(), ex); } try { shutdownCamel(); } catch (Exception ex) { LOG.warn("Caught exception " + ex.getLocalizedMessage(), ex); } } }); } After consuming the 5th element and returning its exchange through the consumer template the barrier is invoked and both threads proceed. While the test will continue to receive exchanges (or the body of it), the shutdown thread attempts to shutdown the camel context in order to mimic the behavior we noticed on live. However, for some reason, even if ServiceSupport internally uses AtomicBoolean to synchronize the internal state of the service among threads, the consumer thread keeps consuming messages from the queue. The stop method of the consumer is invoked only after the test is torn down and I’m somehow unable to stop the consumer (or the context) prematurely and therefore fake the behavior of an application shutdown. Our desired behavior is that any available exchange should finish gracefully, while the component stops on consuming further messages from the queue to avoid data loss. The actual consumer does look like this @Override protected void doStart() throws Exception { executor = endpoint.getCamelContext().getExecutorServiceManager() .newSingleThreadExecutor(this, endpoint.getEndpointUri()); executor.execute(this); super.doStart(); } @Override protected void doStop() throws Exception { LOG.info("Stop signal received for queue {}", queueName); super.stop(); if (executor != null) { endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor); executor.shutdown(); // does prevent further task from being added executor = null; } super.doStop(); } @Override public void run() { if (LOG.isTraceEnabled()) { LOG.trace("Starting consumer for queue {}", endpoint.getConfiguration().getQueueName()); } while (isRunAllowed()) { … KeyValue<String, RedisSedaPayload> popped = con.blpop(endpoint.getConfiguration().getPollTimeout(), queueName); if (popped != null) { RedisSedaPayload redisPayload = popped.value; exchange = this.getEndpoint().createExchange(); if (LOG.isDebugEnabled()) { LOG.debug("Consumed message with jobId {}", redisPayload.getId()); } final Object body = redisPayload.getPayload(); if (body instanceof DefaultExchangeHolder) { DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) body); } else { exchange.getIn().setBody(body); } try { // process using the asynchronous routing engine getAsyncProcessor().process(exchange, new AsyncCallback() { public void done(boolean asyncDone) { // noop } }); if (exchange.getException() != null) { getExceptionHandler() .handleException("Error processing exchange", exchange, exchange.getException()); } } catch (Exception e) { LOG.error(CamelExchangeException.createExceptionMessage( "Exception in Consumer upon when trying to process exchange", exchange, e)); } } // further exception handling omitted } LOG.debug("Consuming from redis queue {} finished", queueName); } I’m aware that the data loss was caused between the blocking pop and the forward to the AsyncProcessor as the message was consumed from the queue but the exchange was not handled by Camel anymore. We might push the message in that case to the Redis queue again. However, we wanted to have a test-case first that actually mimics the shutdown behavior noticed on live. Any tips on how to achieve this reliably?
signature.asc
Description: Message signed with OpenPGP using GPGMail