We have a custom Camel component named CamelRedisSeda which basically is an 
extension of UriEndpointComponent, which pops items from or pushes items to a 
Redis queue. For testing both Redis client and the connection are mocked and 
use a blocking queue in the back as Redis queue replacement.

I test the consumer of the component using the ConsumerTemplate like this:

@Test
public void testRedisSedaConsumerForMultipleAvailableMessages() throws 
Exception {

  String queueName = "redis-seda:q1";
  RedisConnection<String, RedisSedaPayload> con =
      redisClient.connect(new ObjectCodec<RedisSedaPayload>());
  con.rpush(queueName,
            new RedisSedaPayload("test"),
            new RedisSedaPayload("test2"),
            new RedisSedaPayload("test3"));

  ConsumerTemplate consumerTemplate = consumer();
  String body = 
consumerTemplate.receiveBody("redis-seda://q1?redisClient=#redisClient", 500, 
String.class);
  String body2 = 
consumerTemplate.receiveBody("redis-seda://q1?redisClient=#redisClient", 500, 
String.class);
  String body3 = 
consumerTemplate.receiveBody("redis-seda://q1?redisClient=#redisClient", 500, 
String.class);
  String body4 = 
consumerTemplate.receiveBody("redis-seda://q1?redisClient=#redisClient", 500, 
String.class);

  assertThat(body, is(equalTo("test")));
  assertThat(body2, is(equalTo("test2")));
  assertThat(body3, is(equalTo("test3")));
  assertThat(body4, is(nullValue()));
}

which works quite well. Though, we noticed while patching one of our servers 
recently, which also required a restart, that one of our messages got lost 
during the restart. To test this behavior I wanted to create a test case which 
fills the mocked queue with plenty of messages and shut down the camel context 
while the consumer pops messages from the queue.

Basically the test first creates 1000 entries which the consumer should create. 
I then create a worker thread which stops the context (and the consumer) after 
the 5th message was processed. Each pop is currently mocked with a slight sleep 
delay so that the whole tests needs a couple of seconds and the shutdown thread 
has a chance to perform its task.

  @Test
  public void testCamelShutdownWhileConsumingMessages() throws Exception {
    int maxItems = 1000;
    String queueName = "redis-seda:q1";
    RedisConnection<String, RedisSedaPayload> con =
        redisClient.connect(new ObjectCodec<RedisSedaPayload>());
    for (int i=1; i<maxItems+1; i++) {
      con.rpush(queueName, new RedisSedaPayload(i));
    }

    ConsumerTemplate consumerTemplate = consumer();

    CyclicBarrier barrier = new CyclicBarrier(2);
    camelShutdownThread(barrier).start();

    Exchange exchange = null;
    int lastProcessedItem = 0;
    do {
      try {
        if (5 == lastProcessedItem) {
          LOG.debug("Consumer test waiting for synchronization point");
          barrier.await();
        }
        exchange =
            
consumerTemplate.receive("redis-seda://q1?redisClient=#redisClient", 50);
        if (null != exchange) {
          lastProcessedItem = exchange.getIn().getBody(Integer.class);
        }
      } catch (Exception ex) {
        LOG.warn("Caught exception", ex);
        // extract the item number from the last processed exchange body
        if (null != exchange) {
          lastProcessedItem = exchange.getIn().getBody(Integer.class);
        }
        exchange = null;
      }
    } while (null != exchange);

    LOG.info("Consumed items: {}, available items: {}", lastProcessedItem, 
con.llen(""));
    assertThat(lastProcessedItem + con.llen(""), is(equalTo((long)maxItems)));
  }

The shutdown thread invoked from within the test does look like this

private Thread camelShutdownThread(final CyclicBarrier barrier) throws 
Exception {
  return new Thread(new Runnable() {
    @Override
    public void run() {
      try {
        LOG.debug("Shutdown thread waiting for synchronization point");
        barrier.await();
        LOG.info("Performing shutdown request");
      } catch (BrokenBarrierException |InterruptedException ex) {
        LOG.warn("Caught exception " + ex.getLocalizedMessage(), ex);
      }
      try {
        shutdownCamel();
      } catch (Exception ex) {
        LOG.warn("Caught exception " + ex.getLocalizedMessage(), ex);
      }
    }
  });
}

After consuming the 5th element and returning its exchange through the consumer 
template the barrier is invoked and both threads proceed. While the test will 
continue to receive exchanges (or the body of it), the shutdown thread attempts 
to shutdown the camel context in order to mimic the behavior we noticed on live.

However, for some reason, even if ServiceSupport internally uses AtomicBoolean 
to synchronize the internal state of the service among threads, the consumer 
thread keeps consuming messages from the queue. The stop method of the consumer 
is invoked only after the test is torn down and I’m somehow unable to stop the 
consumer (or the context) prematurely and therefore fake the behavior of an 
application shutdown.

Our desired behavior is that any available exchange should finish gracefully, 
while the component stops on consuming further messages from the queue to avoid 
data loss. The actual consumer does look like this

@Override
protected void doStart() throws Exception {
  executor = endpoint.getCamelContext().getExecutorServiceManager()
      .newSingleThreadExecutor(this, endpoint.getEndpointUri());
  executor.execute(this);
  super.doStart();
}

@Override
protected void doStop() throws Exception {
  LOG.info("Stop signal received for queue {}", queueName);
  super.stop();
  if (executor != null) {
    
endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
    executor.shutdown(); // does prevent further task from being added
    executor = null;
  }
  super.doStop();
}

@Override
public void run() {
  if (LOG.isTraceEnabled()) {
    LOG.trace("Starting consumer for queue {}", 
endpoint.getConfiguration().getQueueName());
  }

  while (isRunAllowed()) {
    …
    KeyValue<String, RedisSedaPayload> popped =
    con.blpop(endpoint.getConfiguration().getPollTimeout(), queueName);
    if (popped != null) {
      RedisSedaPayload redisPayload = popped.value;
      exchange = this.getEndpoint().createExchange();
      if (LOG.isDebugEnabled()) {
        LOG.debug("Consumed message with jobId {}", redisPayload.getId());
      }
      final Object body = redisPayload.getPayload();
      if (body instanceof DefaultExchangeHolder) {
        DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) body);
      } else {
        exchange.getIn().setBody(body);
      }
      try {
        // process using the asynchronous routing engine
        getAsyncProcessor().process(exchange, new AsyncCallback() {
          public void done(boolean asyncDone) {
            // noop
          }
        });

        if (exchange.getException() != null) {
          getExceptionHandler()
              .handleException("Error processing exchange", exchange,
                               exchange.getException());
        }
      } catch (Exception e) {
        LOG.error(CamelExchangeException.createExceptionMessage(
            "Exception in Consumer upon when trying to process exchange", 
exchange, e));
      }
    }
   // further exception handling omitted
  }
  LOG.debug("Consuming from redis queue {} finished", queueName);
}

I’m aware that the data loss was caused between the blocking pop and the 
forward to the AsyncProcessor as the message was consumed from the queue but 
the exchange was not handled by Camel anymore. We might push the message in 
that case to the Redis queue again. However, we wanted to have a test-case 
first that actually mimics the shutdown behavior noticed on live.

Any tips on how to achieve this reliably?



Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to