Removing Jclouds from the code base along with the aopalliance dependency since it isn't used.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/550cde5b Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/550cde5b Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/550cde5b Branch: refs/heads/USERGRID-1246-MASTER Commit: 550cde5b38a4dbe6278fbf21a48293e4dfcf36a8 Parents: abe3515 Author: George Reyes <g...@apache.org> Authored: Fri Apr 29 15:47:10 2016 -0700 Committer: George Reyes <g...@apache.org> Committed: Mon May 2 10:50:49 2016 -0700 ---------------------------------------------------------------------- stack/core/pom.xml | 10 - stack/pom.xml | 138 +--------- stack/services/pom.xml | 51 ---- .../management/OrganizationProfile.java | 88 ------ .../services/queues/ImportQueueListener.java | 87 ------ .../services/queues/ImportQueueManager.java | 82 ------ .../services/queues/ImportQueueMessage.java | 79 ------ .../usergrid/services/queues/QueueListener.java | 274 ------------------- 8 files changed, 2 insertions(+), 807 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/550cde5b/stack/core/pom.xml ---------------------------------------------------------------------- diff --git a/stack/core/pom.xml b/stack/core/pom.xml index 42012d9..ded2a25 100644 --- a/stack/core/pom.xml +++ b/stack/core/pom.xml @@ -195,11 +195,6 @@ </dependency> <dependency> - <groupId>javax.persistence</groupId> - <artifactId>persistence-api</artifactId> - </dependency> - - <dependency> <groupId>com.fasterxml.uuid</groupId> <artifactId>java-uuid-generator</artifactId> </dependency> @@ -247,11 +242,6 @@ </dependency> <dependency> - <groupId>aopalliance</groupId> - <artifactId>aopalliance</artifactId> - </dependency> - - <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/usergrid/blob/550cde5b/stack/pom.xml ---------------------------------------------------------------------- diff --git a/stack/pom.xml b/stack/pom.xml index 8ea424b..9e296f7 100644 --- a/stack/pom.xml +++ b/stack/pom.xml @@ -95,6 +95,7 @@ <!-- =================================================================== --> <!-- Properties: Dependency Settings --> + <!-- Properties: Dependency Settings --> <!-- =================================================================== --> <amber-version>0.22-incubating</amber-version> @@ -109,7 +110,6 @@ <jacoco.version>0.7.5.201505241946</jacoco.version> <jackson-version>1.9.9</jackson-version> <jackson-2-version>2.3.3</jackson-2-version> - <jclouds.version>1.9.0</jclouds.version> <jersey-version>2.21</jersey-version> <junit-version>4.12</junit-version> <log4j-version>1.2.16</log4j-version> @@ -264,18 +264,6 @@ </dependency> <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-core</artifactId> - <version>5.5.0</version> - <exclusions> - <exclusion> - <artifactId>commons-logging</artifactId> - <groupId>commons-logging</groupId> - </exclusion> - </exclusions> - </dependency> - - <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.4.1</version> @@ -287,20 +275,6 @@ </exclusions> </dependency> - - - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-spring</artifactId> - <version>5.5.0</version> - </dependency> - - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-pool</artifactId> - <version>5.5.0</version> - </dependency> - <dependency> <groupId>org.apache.amber</groupId> <artifactId>amber-oauth2-common</artifactId> @@ -559,12 +533,6 @@ </dependency> <dependency> - <groupId>javax.persistence</groupId> - <artifactId>persistence-api</artifactId> - <version>1.0</version> - </dependency> - - <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.0.1</version> @@ -812,12 +780,6 @@ <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>${org.springframework.version}</version> - <exclusions> - <exclusion> - <groupId>aopalliance</groupId> - <artifactId>aopalliance</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> @@ -848,12 +810,6 @@ <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${org.springframework.version}</version> - <exclusions> - <exclusion> - <groupId>aopalliance</groupId> - <artifactId>aopalliance</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> @@ -881,7 +837,7 @@ <version>0.9.94</version> </dependency> - + <!-- Used for the asset mime type identification--> <dependency> <groupId>org.apache.tika</groupId> <artifactId>tika-core</artifactId> @@ -1041,96 +997,6 @@ </dependency> <dependency> - <groupId>org.apache.jclouds</groupId> - <artifactId>jclouds-blobstore</artifactId> - <version>${jclouds.version}</version> - <exclusions> - <!-- blows up our version of guice--> - <exclusion> - <groupId>com.google.inject.extensions</groupId> - <artifactId>guice-assistedinject</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.jclouds</groupId> - <artifactId>jclouds-core</artifactId> - <version>${jclouds.version}</version> - <exclusions> - <exclusion> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-core</artifactId> - </exclusion> - - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - - <exclusion> - <groupId>aopalliance</groupId> - <artifactId>aopalliance</artifactId> - </exclusion> - - <!-- if this is uncommented out sends our service tier into bootloop --> - <!-- blows up our version of guice--> - <!--<exclusion>--> - <!--<groupId>com.google.inject.extensions</groupId>--> - <!--<artifactId>guice-assistedinject</artifactId>--> - <!--</exclusion>--> - </exclusions> - </dependency> - - <dependency> - <groupId>aopalliance</groupId> - <artifactId>aopalliance</artifactId> - <version>1.0</version> - </dependency> - - <dependency> - <groupId>org.apache.jclouds</groupId> - <artifactId>jclouds-allblobstore</artifactId> - <version>${jclouds.version}</version> - <exclusions> - <exclusion> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.jclouds.driver</groupId> - <artifactId>jclouds-netty</artifactId> - <version>${jclouds.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.jclouds.driver</groupId> - <artifactId>jclouds-apachehc</artifactId> - <version>${jclouds.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.jclouds.driver</groupId> - <artifactId>jclouds-log4j</artifactId> - <version>${jclouds.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.jclouds.driver</groupId> - <artifactId>jclouds-slf4j</artifactId> - <version>${jclouds.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> <groupId>org.apache.usergrid</groupId> <artifactId>usergrid-java-client</artifactId> <version>0.0.10-SNAPSHOT</version> http://git-wip-us.apache.org/repos/asf/usergrid/blob/550cde5b/stack/services/pom.xml ---------------------------------------------------------------------- diff --git a/stack/services/pom.xml b/stack/services/pom.xml index 06ae5d8..e80553b 100644 --- a/stack/services/pom.xml +++ b/stack/services/pom.xml @@ -260,57 +260,6 @@ </dependency> <dependency> - <groupId>org.apache.jclouds</groupId> - <artifactId>jclouds-blobstore</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.jclouds</groupId> - <artifactId>jclouds-core</artifactId> - <exclusions> - <exclusion> - <artifactId>guice-assistedinject</artifactId> - <groupId>com.google.inject.extensions</groupId> - </exclusion> - <exclusion> - <artifactId>guice</artifactId> - <groupId>com.google.inject</groupId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.jclouds</groupId> - <artifactId>jclouds-allblobstore</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.jclouds.driver</groupId> - <artifactId>jclouds-netty</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.jclouds.driver</groupId> - <artifactId>jclouds-apachehc</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.jclouds.driver</groupId> - <artifactId>jclouds-log4j</artifactId> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.jclouds.driver</groupId> - <artifactId>jclouds-slf4j</artifactId> - </dependency> - - <dependency> <groupId>org.apache.tika</groupId> <artifactId>tika-core</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/usergrid/blob/550cde5b/stack/services/src/main/java/org/apache/usergrid/management/OrganizationProfile.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/management/OrganizationProfile.java b/stack/services/src/main/java/org/apache/usergrid/management/OrganizationProfile.java deleted file mode 100644 index 308a431..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/management/OrganizationProfile.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.usergrid.management; - - -import javax.persistence.Column; -import javax.persistence.Entity; -import javax.persistence.Id; -import javax.persistence.Table; - - -@Entity -@Table(name = "Organizations") -public class OrganizationProfile { - - @Id - private String name; - - @Column(name = "email") - private String email; - - // some stuff to collect - - @Column(name = "contact") - private String contact; - - @Column(name = "paypal") - private String paypal; - - - public OrganizationProfile() { - - } - - - public String getName() { - return name; - } - - - public void setName( String name ) { - this.name = name; - } - - - public String getEmail() { - return email; - } - - - public void setEmail( String email ) { - this.email = email; - } - - - public String getContact() { - return contact; - } - - - public void setContact( String contact ) { - this.contact = contact; - } - - - public String getPaypal() { - return paypal; - } - - - public void setPaypal( String paypal ) { - this.paypal = paypal; - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/550cde5b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java deleted file mode 100644 index 5fcca19..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.usergrid.services.queues; - - -import java.util.List; -import java.util.Properties; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.EntityManagerFactory; -import org.apache.usergrid.persistence.queue.QueueMessage; -import org.apache.usergrid.services.ServiceManagerFactory; - -import com.google.inject.Inject; -import com.google.inject.Injector; -import com.google.inject.Singleton; - - -//TODO: make sure this is properly instantiated by guice -@Singleton -public class ImportQueueListener extends QueueListener { - - /** - * Initializes the QueueListener. Need to wire the factories up in guice. - */ - private static final Logger logger = LoggerFactory.getLogger( ImportQueueListener.class ); - - - public static String QUEUE_NAME = "import_v1"; - //TODO: someway to tell the base class what the queuename is. The scope would be different. - - @Inject - public ImportQueueListener( final ServiceManagerFactory smf, final EntityManagerFactory emf, - final Injector injector, final Properties props ) { - super( smf, emf, injector, props ); - } - - - /** - * Executes import specific functionality on the list of messages that was returned from the - * queue. - * @param messages - */ - @Override - public void onMessage( final List<QueueMessage> messages ) throws Exception { - /** - * Much like in the original queueListener , we need to translate the Messages that we get - * back from the QueueMessage into something like an Import message. The way that a - * notification does it is in line 163 of the notification QueueListener we take the body - * of the message and typecast it into a model called ApplicationQueueMessage. Then it does - * work on the message. - */ - if (logger.isTraceEnabled()) { - logger.trace("Doing work in onMessage in ImportQueueListener"); - } - for (QueueMessage message : messages) { - ImportQueueMessage queueMessage = ( ImportQueueMessage ) message.getBody(); - -// TODO We still need to hide this queue behind the scheduler importService.downloadAndImportFile( queueMessage ); - - } - - } - - //TODO: make this set from the properties file. Due to having a shared amazon account. - @Override - public String getQueueName() { - return QUEUE_NAME; - } - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/550cde5b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java deleted file mode 100644 index f3c65c7..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. The ASF licenses this file to You - * * under the Apache License, Version 2.0 (the "License"); you may not - * * use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. For additional information regarding - * * copyright in this work, please see the NOTICE file in the top level - * * directory of this distribution. - * - */ - -package org.apache.usergrid.services.queues; - - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.QueueMessage; - - -/** - * Manages the queueManager implementation for Import - */ -public class ImportQueueManager implements QueueManager { - - @Override - public List<QueueMessage> getMessages(final int limit, final Class klass) { - return new ArrayList<>(); - } - - @Override - public long getQueueDepth() { - return 0; - } - - - @Override - public void commitMessage( final QueueMessage queueMessage ) { - - } - - - @Override - public void commitMessages( final List<QueueMessage> queueMessages ) { - - } - - - @Override - public void sendMessages( final List bodies ) throws IOException { - - } - - - @Override - public <T extends Serializable> void sendMessage( final T body ) throws IOException { - - } - - - @Override - public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException { - - } - - @Override - public void deleteQueue() { - - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/550cde5b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueMessage.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueMessage.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueMessage.java deleted file mode 100644 index 9879eaa..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueMessage.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.usergrid.services.queues; - - -import java.io.Serializable; -import java.util.UUID; - - -/** - * Deserializes the Import Message that gets stored in the returned QueueMessage and - * gets the message back. Currently based on ApplicationQueueMessage - */ -public class ImportQueueMessage implements Serializable { - /** - * Import specific identifiers here - */ - - //Needed to see what import job the Queue Message is a part of - private UUID fileId; - - //Needed to determine what file we are working on importing - private String fileName; - - private UUID applicationId; - - - public ImportQueueMessage(){ - } - - public ImportQueueMessage(UUID fileId, UUID applicationId ,String fileName){ - this.fileId = fileId; - this.applicationId = applicationId; - this.fileName = fileName; - } - - - public UUID getApplicationId() { - return applicationId; - } - - - public void setApplicationId( final UUID applicationId ) { - this.applicationId = applicationId; - } - - - public UUID getFileId() { - return fileId; - } - - - public void setFileId( final UUID fileId ) { - this.fileId = fileId; - } - - - public String getFileName() { - return fileName; - } - - public void setFileName( final String fileName ) { - this.fileName = fileName; - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/550cde5b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java deleted file mode 100644 index 9d95d87..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.usergrid.services.queues; - -import com.codahale.metrics.*; -import com.codahale.metrics.Timer; -import com.google.inject.Injector; - -import org.apache.usergrid.persistence.EntityManagerFactory; - -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.queue.*; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; -import org.apache.usergrid.services.ServiceManager; -import org.apache.usergrid.services.ServiceManagerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.PostConstruct; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; - - -/** - * Listens to the SQS queue and polls it for more queue messages. Then hands the queue messages off to the - * QueueProcessorFactory - */ -public abstract class QueueListener { - public final int MESSAGE_TRANSACTION_TIMEOUT = 25 * 1000; - private final QueueManagerFactory queueManagerFactory; - - public long DEFAULT_SLEEP = 5000; - - private static final Logger logger = LoggerFactory.getLogger(QueueListener.class); - - private MetricsFactory metricsService; - - private ServiceManagerFactory smf; - - private EntityManagerFactory emf; - - - private Properties properties; - - - private ServiceManager svcMgr; - - private long sleepWhenNoneFound = 0; - - private long sleepBetweenRuns = 0; - - private ExecutorService pool; - private List<Future> futures; - - public final int MAX_THREADS = 2; - private Integer batchSize = 10; - private String queueName; - private int consecutiveCallsToRemoveDevices; - private Meter meter; - private Timer timer; - - /** - * Initializes the QueueListener. - * @param smf - * @param emf - * @param props - */ - public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, Injector injector, Properties props){ - //TODO: change current injectors to use service module instead of CpSetup - this.queueManagerFactory = injector.getInstance( QueueManagerFactory.class ); - this.smf = smf; - this.emf = injector.getInstance( EntityManagerFactory.class ); //emf; - this.metricsService = injector.getInstance(MetricsFactory.class); - this.properties = props; - meter = metricsService.getMeter(QueueListener.class, "execute.commit"); - timer = metricsService.getTimer(QueueListener.class, "execute.dequeue"); - - } - - - /** - * Start the queueListener. Initializes queue settings before starting the queue. - */ //TODO: make use guice. Currently on spring. Needs to run and finish for main thread. - @PostConstruct - public void start(){ - boolean shouldRun = new Boolean(properties.getProperty("usergrid.queues.listener.run", "true")); - - if(shouldRun) { - if (logger.isTraceEnabled()) { - logger.trace("QueueListener: starting."); - } - int threadCount = 0; - - try { - sleepBetweenRuns = new Long(properties.getProperty("usergrid.queues.listener.sleep.between", ""+sleepBetweenRuns)).longValue(); - sleepWhenNoneFound = new Long(properties.getProperty("usergrid.queues.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue(); - batchSize = new Integer(properties.getProperty("usergrid.queues.listener.MAX_TAKE", (""+batchSize))); - consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.queues.inactive.interval", ""+200)); - queueName = getQueueName(); - - int maxThreads = new Integer(properties.getProperty("usergrid.queues.listener.maxThreads", ""+MAX_THREADS)); - - futures = new ArrayList<Future>(maxThreads); - - //create our thread pool based on our threadcount. - - pool = Executors.newFixedThreadPool(maxThreads); - - while (threadCount++ < maxThreads) { - if (logger.isTraceEnabled()) { - logger.trace("QueueListener: Starting thread {}.", threadCount); - } - Runnable task = new Runnable() { - @Override - public void run() { - try { - execute(); - } catch (Exception e) { - logger.warn("failed to start push", e); - } - } - }; - futures.add( pool.submit(task)); - } - } catch (Exception e) { - logger.error("QueueListener: failed to start:", e); - } - if (logger.isTraceEnabled()) { - logger.trace("QueueListener: done starting."); - } - }else{ - logger.info("QueueListener: never started due to config value usergrid.queues.listener.run."); - } - - } - - - /** - * Queue Processor - * Polls the queue for messages, then processes the messages that it gets. - */ - private void execute(){ - if(Thread.currentThread().isDaemon()) { - Thread.currentThread().setDaemon(true); - } - Thread.currentThread().setName("queues_Processor"+UUID.randomUUID()); - - final AtomicInteger consecutiveExceptions = new AtomicInteger(); - if (logger.isTraceEnabled()) { - logger.trace("QueueListener: Starting execute process."); - } - svcMgr = smf.getServiceManager(smf.getManagementAppId()); - if (logger.isTraceEnabled()) { - logger.trace("getting from queue {} ", queueName); - } - QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCAL); - QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope); - // run until there are no more active jobs - long runCount = 0; - - - while ( true ) { - - - Timer.Context timerContext = timer.time(); - //Get the messages out of the queue. - //TODO: a model class to get generic queueMessages out of the queueManager. Ask Shawn what should go here. - rx.Observable.from( queueManager.getMessages(getBatchSize(), ImportQueueMessage.class)) - .buffer(getBatchSize()) - .doOnNext(messages -> { - try { - if (logger.isTraceEnabled()) { - logger.trace("retrieved batch of {} messages from queue {} ", messages.size(), queueName); - } - - if (messages.size() > 0) { - - long now = System.currentTimeMillis(); - //TODO: make sure this has a way to determine which QueueListener needs to be used - // ideally this is done by checking which type the messages have then - // asking for a onMessage call. - onMessage(messages); - - queueManager.commitMessages(messages); - - meter.mark(messages.size()); - if (logger.isTraceEnabled()) { - logger.trace("sent batch {} messages duration {} ms", messages.size(), System.currentTimeMillis() - now); - } - - if (sleepBetweenRuns > 0) { - if (logger.isTraceEnabled()) { - logger.trace("sleep between rounds...sleep...{}", sleepBetweenRuns); - } - Thread.sleep(sleepBetweenRuns); - } - - } else { - if (logger.isTraceEnabled()) { - logger.trace("no messages...sleep...{}", sleepWhenNoneFound); - } - Thread.sleep(sleepWhenNoneFound); - } - timerContext.stop(); - //send to the providers - consecutiveExceptions.set(0); - } catch (Exception ex) { - logger.error("failed to dequeue", ex); - try { - long sleeptime = sleepWhenNoneFound * consecutiveExceptions.incrementAndGet(); - long maxSleep = 15000; - sleeptime = sleeptime > maxSleep ? maxSleep : sleeptime; - if (logger.isTraceEnabled()) { - logger.trace("sleeping due to failures {} ms", sleeptime); - } - Thread.sleep(sleeptime); - } catch (InterruptedException ie) { - if (logger.isTraceEnabled()) { - logger.trace("sleep interrupted"); - } - } - } - }).toBlocking().lastOrDefault(null); - } - } - - - public void stop(){ - if (logger.isTraceEnabled()) { - logger.trace("stop processes"); - } - - if(futures == null){ - return; - } - for(Future future : futures){ - future.cancel(true); - } - - pool.shutdownNow(); - } - - - public void setBatchSize(int batchSize){ - this.batchSize = batchSize; - } - public int getBatchSize(){return batchSize;} - - - /** - * This will be the method that does the job dependant execution. - * @param messages - */ - public abstract void onMessage(List<QueueMessage> messages) throws Exception; - - public abstract String getQueueName(); - -}