Logging improvements
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1f28e2ab Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1f28e2ab Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1f28e2ab Branch: refs/heads/master Commit: 1f28e2abc46fe325bd861729f83541809cfc9ff1 Parents: 81233b0 Author: Dave Johnson <snoopd...@apache.org> Authored: Wed Oct 5 09:59:20 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Wed Oct 5 09:59:20 2016 -0400 ---------------------------------------------------------------------- .../impl/DistributedQueueServiceImpl.java | 47 ++++++++++++++------ 1 file changed, 33 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/1f28e2ab/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java index e24bdb4..af71247 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; +import java.lang.reflect.Method; import java.util.*; import java.util.concurrent.TimeUnit; @@ -66,6 +67,9 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { this.queueManager = queueManager; this.qakkaFig = qakkaFig; this.messageCounterSerialization = messageCounterSerialization; + + + } @@ -74,20 +78,34 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { try { List<String> queues = queueManager.getListOfQueues(); - for ( String queueName : queues ) { + for (String queueName : queues) { initQueue( queueName ); } - }catch (InvalidQueryException e){ + } catch (InvalidQueryException e) { - if (e.getMessage().contains("unconfigured columnfamily")){ - logger.info("Unable to initialize queues since system is bootstrapping. " + - "Queues will be initialized when created"); - }else{ + if (e.getMessage().contains( "unconfigured columnfamily" )) { + logger.info( "Unable to initialize queues since system is bootstrapping. " + + "Queues will be initialized when created" ); + } else { throw e; } } + StringBuilder logMessage = new StringBuilder(); + logMessage.append( "DistributedQueueServiceImpl initialized with config:\n" ); + Method[] methods = qakkaFig.getClass().getMethods(); + for ( Method method : methods ) { + if ( method.getName().startsWith("get")) { + try { + logMessage.append(" ") + .append( method.getName().substring(3) ) + .append(" = ") + .append( method.invoke( qakkaFig ).toString() ).append("\n"); + } catch (Exception ignored ) {} + } + } + logger.info( logMessage.toString() ); } @@ -203,7 +221,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { ret.addAll( getNextMessagesInternal( queueName, count )); if ( ret.size() < count ) { - try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 5 ); } catch (Exception ignored) {} + try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 2 ); } catch (Exception ignored) {} } } @@ -226,7 +244,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { int retries = 0; QueueGetRequest request = new QueueGetRequest( queueName, count ); - while ( retries++ < maxRetries ) { + while ( ++retries < maxRetries ) { try { Timeout t = new Timeout( qakkaFig.getGetTimeoutSeconds(), TimeUnit.SECONDS ); @@ -243,17 +261,18 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { QueueGetResponse qprm = (QueueGetResponse)response; if ( qprm.isSuccess() ) { if (retries > 1) { - logger.debug( "getNextMessage SUCCESS after {} retries", retries ); + logger.debug( "getNextMessage {} SUCCESS after {} retries", queueName, retries ); } } + logger.debug("Returning queue {} messages {}", queueName, qprm.getQueueMessages().size()); return qprm.getQueueMessages(); } else if ( response != null ) { - logger.debug("ERROR RESPONSE (1) popping queue, retrying {}", retries ); + logger.debug("ERROR RESPONSE (1) popping queue {}, retrying {}", queueName, retries ); } else { - logger.debug("TIMEOUT popping to queue, retrying {}", retries ); + logger.debug("TIMEOUT popping from queue {}, retrying {}", queueName, retries ); } } else if ( responseObject instanceof ClientActor.ErrorResponse ) { @@ -263,16 +282,16 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { errorResponse.getMessage(), retries ); } else { - logger.debug("UNKNOWN RESPONSE popping queue, retrying {}", retries ); + logger.debug("UNKNOWN RESPONSE popping queue {}, retrying {}", queueName, retries ); } } catch ( Exception e ) { - logger.debug("ERROR popping to queue, retrying " + retries, e ); + logger.debug("ERROR popping to queue " + queueName + " retrying " + retries, e ); } } throw new QakkaRuntimeException( - "Error getting from queue " + queueName + " after " + retries ); + "Error getting from queue " + queueName + " after " + retries + " tries"); }