Ensure that status is updated properly.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f8c703c0 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f8c703c0 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f8c703c0 Branch: refs/heads/USERGRID-909 Commit: f8c703c02c1182ad63ad86587749eb1ae09c202a Parents: 471dc35 Author: Dave Johnson <snoopd...@apache.org> Authored: Thu Oct 29 17:57:32 2015 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Thu Oct 29 17:57:32 2015 -0400 ---------------------------------------------------------------------- .../rest/system/ConnectionResource.java | 48 +++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c703c0/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java index 6e683ed..14b79f3 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java @@ -55,6 +55,7 @@ import com.google.common.base.Preconditions; import com.sun.jersey.api.json.JSONWithPadding; import rx.Observable; +import rx.functions.Action1; import rx.schedulers.Schedulers; @@ -146,36 +147,51 @@ public class ConnectionResource extends AbstractContextResource { //start de duping and run in the background connectionService.deDupeConnections( applicationScopeObservable ).buffer( 10, TimeUnit.SECONDS, 1000 ) - .doOnNext( buffer -> { + .doOnNext(buffer -> { - final long runningTotal = count.addAndGet( buffer.size() ); + final long runningTotal = count.addAndGet(buffer.size()); final Map<String, Object> status = new HashMap<String, Object>() {{ - put( "countProcessed", runningTotal ); - put( "updatedTimestamp", System.currentTimeMillis() ); + put("countProcessed", runningTotal); + put("updatedTimestamp", System.currentTimeMillis()); }}; - statusService.setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId, - StatusService.Status.INPROGRESS, status ).toBlocking().lastOrDefault( null ); - } ).doOnSubscribe( () -> { - statusService.setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId, StatusService.Status.STARTED, - new HashMap<>() ).toBlocking().lastOrDefault( null ); - } ).doOnCompleted( () -> { + statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId, + StatusService.Status.INPROGRESS, status).toBlocking().lastOrDefault(null); + }).doOnSubscribe(() -> { + + statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID, + jobId, StatusService.Status.STARTED, new HashMap<>()).toBlocking().lastOrDefault(null); + + }).doOnCompleted(() -> { final long runningTotal = count.get(); final Map<String, Object> status = new HashMap<String, Object>() {{ - put( "countProcessed", runningTotal ); - put( "updatedTimestamp", System.currentTimeMillis() ); + put("countProcessed", runningTotal); + put("updatedTimestamp", System.currentTimeMillis()); }}; - statusService - .setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId, StatusService.Status.COMPLETE, status ); - } ).subscribeOn( Schedulers.newThread() ).subscribe(); + statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID, + jobId, StatusService.Status.COMPLETE, status).toBlocking().lastOrDefault(null); + + }).doOnError( (throwable) -> { + logger.error("Error deduping connections", throwable); + + final Map<String, Object> status = new HashMap<String, Object>() {{ + put("error", throwable.getMessage() ); + }}; + + statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID, + jobId, StatusService.Status.FAILED, status).toBlocking().lastOrDefault(null);; + + } ).subscribeOn(Schedulers.newThread()).subscribe(); + + final StatusService.JobStatus status = + new StatusService.JobStatus( jobId, StatusService.Status.STARTED, new HashMap<>( ) ); - final StatusService.JobStatus status = new StatusService.JobStatus( jobId, StatusService.Status.STARTED, new HashMap<>( ) ); return createResult( status, callback ); }