Repository: usergrid
Updated Branches:
  refs/heads/usergrid-1318-queue 522a5515b -> 2a8679512


Minor formatting, logging and changes recommended by IntelliJ


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/ae164586
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/ae164586
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/ae164586

Branch: refs/heads/usergrid-1318-queue
Commit: ae16458699a3f9d6b8ea9e2f4b851bb87b39558a
Parents: 522a551
Author: Dave Johnson <snoopd...@apache.org>
Authored: Wed Nov 9 10:43:00 2016 -0500
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Wed Nov 9 10:43:00 2016 -0500

----------------------------------------------------------------------
 .../asyncevents/AsyncEventService.java          | 11 ++++---
 .../actorsystem/ActorSystemManagerImpl.java     | 12 ++++----
 .../persistence/actorsystem/ClientActor.java    |  2 +-
 .../persistence/core/CassandraConfigImpl.java   |  6 ++++
 .../distributed/actors/QueueActorRouter.java    |  2 +-
 .../qakka/distributed/actors/QueueSender.java   |  4 +--
 .../distributed/actors/QueueTimeouter.java      |  5 +++-
 .../qakka/distributed/actors/QueueWriter.java   |  7 +++--
 .../impl/DistributedQueueServiceImpl.java       | 15 +++++++---
 .../impl/MessageCounterSerializationImpl.java   | 30 ++++++++++----------
 10 files changed, 58 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index ec84a0a..cab4e3e 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -43,9 +43,11 @@ public interface AsyncEventService extends ReIndexAction {
     void queueInitializeApplicationIndex( final ApplicationScope 
applicationScope );
 
     /**
-     * Queue an entity to be indexed.  This will start processing immediately. 
For implementations that are realtime (akka, in memory)
-     * We will return a distributed future.  For SQS impls, this will return 
immediately, and the result will not be available.
+     * Queue an entity to be indexed.  This will start processing immediately.
+     * For implementations that are realtime (akka, in memory) We will return 
a distributed future.
+     * For SQS impls, this will return immediately, and the result will not be 
available.
      * After SQS is removed, the tests should be enhanced to ensure that we're 
processing our queues correctly.
+     *
      * @param applicationScope
      * @param entity The entity to index.  Should be fired when an entity is 
updated
      * @param updatedAfter
@@ -54,9 +56,10 @@ public interface AsyncEventService extends ReIndexAction {
 
 
     /**
-     * Fired when a new edge is added to an entity. Such as initial entity 
creation, adding to a collection, or creating a connection
+     * Fired when a new edge is added to an entity. Such as initial entity 
creation,
+     * adding to a collection, or creating a connection
      *
-     * TODO: We shouldn't take an entity here, only the id.  It doesn't make 
sense in a distributed context to pass the entity
+     * TODO: We shouldn't take an entity here, only the id. It doesn't make 
sense in a distributed context
      *
      * @param applicationScope
      * @param entity

http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 96ebe69..c2f96af 100644
--- 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -396,7 +396,7 @@ public class ActorSystemManagerImpl implements 
ActorSystemManager {
 
 
     /**
-     * Create RequestActor for each region.
+     * Create ClientActor for each region.
      */
     private void createClientActors( ActorSystem system ) {
 
@@ -438,7 +438,7 @@ public class ActorSystemManagerImpl implements 
ActorSystemManager {
 
     private void waitForClientActor( ActorRef ra ) {
 
-        logger.info( "Waiting on RequestActor [{}]...", ra.path() );
+        logger.info( "Waiting on ClientActor [{}]...", ra.path() );
 
         started = false;
 
@@ -455,19 +455,19 @@ public class ActorSystemManagerImpl implements 
ActorSystemManager {
                     started = true;
                     break;
                 }
-                logger.info( "Waiting for RequestActor [{}] region [{}] for 
[{}s]", ra.path(), currentRegion, retries );
+                logger.info( "Waiting for ClientActor [{}] region [{}] for 
[{}s]", ra.path(), currentRegion, retries );
                 Thread.sleep( 1000 );
 
             } catch (Exception e) {
-                logger.error( "Error: Timeout waiting for RequestActor [{}]", 
ra.path() );
+                logger.error( "Error: Timeout waiting for ClientActor [{}]", 
ra.path() );
             }
             retries++;
         }
 
         if (started) {
-            logger.info( "RequestActor [{}] has started", ra.path() );
+            logger.info( "ClientActor [{}] has started", ra.path() );
         } else {
-            throw new RuntimeException( "RequestActor ["+ra.path()+"] did not 
start in time" );
+            throw new RuntimeException( "ClientActor ["+ra.path()+"] did not 
start in time" );
         }
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClientActor.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClientActor.java
 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClientActor.java
index c553a31..dba8c2e 100644
--- 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClientActor.java
+++ 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClientActor.java
@@ -82,7 +82,7 @@ public class ClientActor extends UntypedActor {
             ActorSelection service = getContext().actorSelection( address + 
routerPath );
             service.tell( message, getSender() );
 
-        } else if ( routerPath != null && !ready ) {
+        } else if ( routerPath != null ) {
 
             logger.debug("{} responding with status unknown", name);
             getSender().tell( new ErrorResponse("ClientActor not ready"), 
getSender() );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
index 1503093..b0ce9e3 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
@@ -27,6 +27,8 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.model.ConsistencyLevel;
 import org.apache.log4j.lf5.viewer.categoryexplorer.CategoryPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -35,6 +37,7 @@ import 
org.apache.log4j.lf5.viewer.categoryexplorer.CategoryPath;
  */
 @Singleton
 public class CassandraConfigImpl implements CassandraConfig {
+    private static final Logger logger = LoggerFactory.getLogger( 
CassandraConfigImpl.class );
 
     private CassandraFig cassandraFig;
 
@@ -79,6 +82,9 @@ public class CassandraConfigImpl implements CassandraConfig {
               cassandraFig.getApplicationLocalKeyspace() + "_"
             + cassandraFig.getLocalDataCenter().replace("-", "_");
 
+        logger.info("Application Keyspace: {}", applicationKeyspace);
+        logger.info("Application Local Keyspace: {}", 
applicationLocalKeyspace);
+
         //add the listeners to update the values
         cassandraFig.addPropertyChangeListener( new PropertyChangeListener() {
             @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index 71cf332..1ff8502 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -114,7 +114,7 @@ public class QueueActorRouter extends UntypedActor {
 
             if ( timeoutSchedulersByQueueName.get( queueName ) == null) {
                 Cancellable scheduler = 
getContext().system().scheduler().schedule(
-                    Duration.create( 0, TimeUnit.MILLISECONDS ),
+                    Duration.create( 0, TimeUnit.SECONDS ),
                     Duration.create( qakkaFig.getQueueTimeoutSeconds() / 2, 
TimeUnit.SECONDS ),
                     self(),
                     new QueueTimeoutRequest( queueName ),

http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
index 461c28f..f584474 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -129,7 +129,7 @@ public class QueueSender extends UntypedActor {
 
                     if (actorSystemManager.getCurrentRegion().equals( 
destRegion )) {
 
-                        logger.trace("Sending queue {} message to local region 
{}", queueName, destRegion );
+                        logger.trace("{}: Sending queue {} message to local 
region {}", name, queueName, destRegion );
 
                         // send to current region via local clientActor
                         ActorRef clientActor = 
actorSystemManager.getClientActor();
@@ -137,7 +137,7 @@ public class QueueSender extends UntypedActor {
 
                     } else {
 
-                        logger.trace("Sending queue {} message to remote 
region {}", queueName, destRegion );
+                        logger.trace("{} Sending queue {} message to remote 
region {}", name, queueName, destRegion );
 
                         // send to remote region via cluster client for that 
region
                         ActorRef clusterClient = 
actorSystemManager.getClusterClient( destRegion );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index 58afc76..546e638 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -22,6 +22,7 @@ package 
org.apache.usergrid.persistence.qakka.distributed.actors;
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
@@ -43,6 +44,8 @@ import java.util.UUID;
 public class QueueTimeouter extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( 
QueueTimeouter.class );
 
+    private final String name = RandomStringUtils.randomAlphanumeric( 4 );
+
     private final QueueMessageSerialization messageSerialization;
     private final MetricsService            metricsService;
     private final ActorSystemFig            actorSystemFig;
@@ -105,7 +108,7 @@ public class QueueTimeouter extends UntypedActor {
                 }
 
                 if (count > 0) {
-                    logger.debug( "Timed out {} messages for queue {}", count, 
queueName );
+                    logger.debug( "{}: Timed out {} messages for queue {}", 
name, count, queueName );
                 }
 
             } finally {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index c9be47f..b7f5401 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -22,6 +22,7 @@ package 
org.apache.usergrid.persistence.qakka.distributed.actors;
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
 import 
org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
@@ -45,6 +46,8 @@ public class QueueWriter extends UntypedActor {
 
     public enum WriteStatus { SUCCESS_XFERLOG_DELETED, 
SUCCESS_XFERLOG_NOTDELETED, ERROR };
 
+    private final String name = RandomStringUtils.randomAlphanumeric( 4 );
+
     private final QueueMessageSerialization messageSerialization;
     private final TransferLogSerialization  transferLogSerialization;
     private final AuditLogSerialization     auditLogSerialization;
@@ -97,8 +100,8 @@ public class QueueWriter extends UntypedActor {
 
                     messageSerialization.writeMessage( dbqm );
 
-                    //logger.debug("Wrote queue message id {} to queue name 
{}",
-                    //        dbqm.getQueueMessageId(), dbqm.getQueueName());
+                    logger.trace("{}: Wrote queue message id {} to queue name 
{}",
+                            name, dbqm.getQueueMessageId(), 
dbqm.getQueueName());
 
                 } catch (Throwable t) {
                     logger.debug("Error creating database queue message", t);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 96e8cab..1784dc3 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -47,6 +47,7 @@ import scala.concurrent.Future;
 import java.lang.reflect.Method;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 
 @Singleton
@@ -269,7 +270,7 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
                         logger.debug("ERROR RESPONSE (1) popping queue {}, 
retrying {}", queueName, tries );
 
                     } else {
-                        logger.debug("TIMEOUT popping from queue {}, retrying 
{}", queueName, tries );
+                        logger.trace("TIMEOUT popping from queue {}, retrying 
{}", queueName, tries );
                     }
 
                 } else if ( responseObject instanceof 
ClientActor.ErrorResponse ) {
@@ -282,8 +283,11 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
                     logger.debug("UNKNOWN RESPONSE popping queue {}, retrying 
{}", queueName, tries );
                 }
 
+            } catch ( TimeoutException e ) {
+                logger.trace("TIMEOUT popping to queue " + queueName + " 
retrying " + tries, e );
+
             } catch ( Exception e ) {
-                logger.error("ERROR popping to queue " + queueName + " 
retrying " + tries, e );
+                logger.debug("ERROR popping to queue " + queueName + " 
retrying " + tries, e );
             }
         }
 
@@ -335,12 +339,15 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
                     return qprm.getStatus();
 
                 } else if ( response != null  ) {
-                    logger.debug("ERROR RESPONSE sending message, retrying 
{}", retries );
+                    logger.debug("UNKNOWN RESPONSE sending message, retrying 
{}", retries );
 
                 } else {
-                    logger.debug("TIMEOUT sending message, retrying {}", 
retries );
+                    logger.trace("TIMEOUT sending message, retrying {}", 
retries );
                 }
 
+            } catch ( TimeoutException e ) {
+                logger.trace( "TIMEOUT sending message, retrying " + retries, 
e );
+
             } catch ( Exception e ) {
                 logger.debug("ERROR sending message, retrying " + retries, e );
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index 769f584..2e7722d 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -57,15 +57,15 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
     private final CassandraConfig cassandraConfig;
     private final long writeTimeout;
 
-    final static String TABLE_MESSAGE_COUNTERS = "message_counters";
-    final static String COLUMN_QUEUE_NAME      = "queue_name";
-    final static String COLUMN_COUNTER_VALUE   = "counter_value";
-    final static String COLUMN_MESSAGE_TYPE    = "message_type";
+    private final static String TABLE_MESSAGE_COUNTERS = "message_counters";
+    private final static String COLUMN_QUEUE_NAME      = "queue_name";
+    private final static String COLUMN_COUNTER_VALUE   = "counter_value";
+    private final static String COLUMN_MESSAGE_TYPE    = "message_type";
 
     // design note: counters based on DataStax example here:
     // https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html
 
-    static final String CQL =
+    private static final String CQL =
         "CREATE TABLE IF NOT EXISTS message_counters ( " +
                 "counter_value counter, " +
                 "queue_name    varchar, " +
@@ -75,11 +75,11 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
 
 
     /** number of changes since last save to database */
-    final AtomicInteger numChanges = new AtomicInteger( 0 );
+    private final AtomicInteger numChanges = new AtomicInteger( 0 );
 
-    final long maxChangesBeforeSave;
+    private final long maxChangesBeforeSave;
 
-    class InMemoryCount {
+    private class InMemoryCount {
         long baseCount;
         final AtomicLong totalInMemoryCount = new AtomicLong( 0L ); // for 
testing using only in-memory counter
         final AtomicLong increment = new AtomicLong( 0L );
@@ -103,7 +103,7 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
         public long getDecrement() {
             return decrement.get();
         }
-        public void clearDeltas() {
+        private void clearDeltas() {
             this.increment.set( 0L );
             this.decrement.set( 0L );
         }
@@ -125,7 +125,7 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
         }
     }
 
-    private Map<String, InMemoryCount> inMemoryCounters = new 
ConcurrentHashMap<>(200);
+    private final Map<String, InMemoryCount> inMemoryCounters = new 
ConcurrentHashMap<>(200);
 
 
     @Inject
@@ -174,7 +174,7 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
         if ( logger.isDebugEnabled() ) {
             long value = inMemoryCounters.get( key ).value();
             if (value <= 0) {
-                logger.debug( "Queue {} type {} decremented count = {}", 
queueName, type, value );
+                logger.debug( "Queue {} type {} incremented {} count = {}", 
queueName, type, increment, value );
             }
         }
     }
@@ -200,7 +200,7 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
             }
         }
 
-        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+        final InMemoryCount inMemoryCount = inMemoryCounters.get( key );
 
         synchronized ( inMemoryCount ) {
             inMemoryCount.decrement( decrement );
@@ -285,7 +285,7 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
     }
 
 
-    void incrementCounterInStorage( String queueName, 
DatabaseQueueMessage.Type type, long increment ) {
+    private void incrementCounterInStorage( String queueName, 
DatabaseQueueMessage.Type type, long increment ) {
 
         Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS )
                 .where( QueryBuilder.eq(   COLUMN_QUEUE_NAME, queueName ) )
@@ -295,7 +295,7 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
     }
 
 
-    void decrementCounterInStorage( String queueName, 
DatabaseQueueMessage.Type type, long decrement ) {
+    private void decrementCounterInStorage( String queueName, 
DatabaseQueueMessage.Type type, long decrement ) {
 
         Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS )
             .where( QueryBuilder.eq(   COLUMN_QUEUE_NAME, queueName ) )
@@ -305,7 +305,7 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
     }
 
 
-    Long retrieveCounterFromStorage( String queueName, 
DatabaseQueueMessage.Type type ) {
+    private Long retrieveCounterFromStorage( String queueName, 
DatabaseQueueMessage.Type type ) {
 
         Statement query = QueryBuilder.select().from( TABLE_MESSAGE_COUNTERS )
                 .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )

Reply via email to