Ensure that status is updated properly.

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f8c703c0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f8c703c0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f8c703c0

Branch: refs/heads/USERGRID-909
Commit: f8c703c02c1182ad63ad86587749eb1ae09c202a
Parents: 471dc35
Author: Dave Johnson <snoopd...@apache.org>
Authored: Thu Oct 29 17:57:32 2015 -0400
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Thu Oct 29 17:57:32 2015 -0400

----------------------------------------------------------------------
 .../rest/system/ConnectionResource.java         | 48 +++++++++++++-------
 1 file changed, 32 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c703c0/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java
----------------------------------------------------------------------
diff --git 
a/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java
 
b/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java
index 6e683ed..14b79f3 100644
--- 
a/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java
+++ 
b/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java
@@ -55,6 +55,7 @@ import com.google.common.base.Preconditions;
 import com.sun.jersey.api.json.JSONWithPadding;
 
 import rx.Observable;
+import rx.functions.Action1;
 import rx.schedulers.Schedulers;
 
 
@@ -146,36 +147,51 @@ public class ConnectionResource extends 
AbstractContextResource {
 
         //start de duping and run in the background
         connectionService.deDupeConnections( applicationScopeObservable 
).buffer( 10, TimeUnit.SECONDS, 1000 )
-                         .doOnNext( buffer -> {
+                         .doOnNext(buffer -> {
 
 
-                             final long runningTotal = count.addAndGet( 
buffer.size() );
+                             final long runningTotal = 
count.addAndGet(buffer.size());
 
                              final Map<String, Object> status = new 
HashMap<String, Object>() {{
-                                 put( "countProcessed", runningTotal );
-                                 put( "updatedTimestamp", 
System.currentTimeMillis() );
+                                 put("countProcessed", runningTotal);
+                                 put("updatedTimestamp", 
System.currentTimeMillis());
                              }};
 
-                             statusService.setStatus( 
CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId,
-                                 StatusService.Status.INPROGRESS, status 
).toBlocking().lastOrDefault( null );
-                         } ).doOnSubscribe( () -> {
-            statusService.setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, 
jobId, StatusService.Status.STARTED,
-                new HashMap<>() ).toBlocking().lastOrDefault( null );
-        } ).doOnCompleted( () -> {
+                             
statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId,
+                                 StatusService.Status.INPROGRESS, 
status).toBlocking().lastOrDefault(null);
+                         }).doOnSubscribe(() -> {
+
+            statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID,
+                jobId, StatusService.Status.STARTED, new 
HashMap<>()).toBlocking().lastOrDefault(null);
+
+        }).doOnCompleted(() -> {
 
             final long runningTotal = count.get();
 
             final Map<String, Object> status = new HashMap<String, Object>() {{
-                put( "countProcessed", runningTotal );
-                put( "updatedTimestamp", System.currentTimeMillis() );
+                put("countProcessed", runningTotal);
+                put("updatedTimestamp", System.currentTimeMillis());
             }};
 
-            statusService
-                .setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId, 
StatusService.Status.COMPLETE, status );
-        } ).subscribeOn( Schedulers.newThread() ).subscribe();
+            statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID,
+                jobId, StatusService.Status.COMPLETE, 
status).toBlocking().lastOrDefault(null);
+
+        }).doOnError( (throwable) -> {
+            logger.error("Error deduping connections", throwable);
+
+            final Map<String, Object> status = new HashMap<String, Object>() {{
+                put("error", throwable.getMessage() );
+            }};
+
+            statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID,
+                jobId, StatusService.Status.FAILED, 
status).toBlocking().lastOrDefault(null);;
+
+        } ).subscribeOn(Schedulers.newThread()).subscribe();
+
 
+        final StatusService.JobStatus status =
+            new StatusService.JobStatus( jobId, StatusService.Status.STARTED, 
new HashMap<>(  ) );
 
-        final StatusService.JobStatus status = new StatusService.JobStatus( 
jobId, StatusService.Status.STARTED, new HashMap<>(  ) );
         return createResult( status, callback );
     }
 

Reply via email to