remove observable
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/32d35e7d Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/32d35e7d Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/32d35e7d Branch: refs/heads/master Commit: 32d35e7d5e605005b3f530e3a1e390d394c29c0a Parents: f6409ce Author: Shawn Feldman <sfeld...@apache.org> Authored: Tue Oct 13 10:42:58 2015 -0600 Committer: Shawn Feldman <sfeld...@apache.org> Committed: Tue Oct 13 10:42:58 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 109 ++++++++++--------- 1 file changed, 55 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/32d35e7d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index 6f41563..0fef974 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -259,65 +259,66 @@ public class AmazonAsyncEventService implements AsyncEventService { logger.debug("callEventHandlers with {} message", messages.size()); } - Observable<IndexEventResult> masterObservable = Observable.from(messages).map(message -> { - AsyncEvent event = null; - try { - event = (AsyncEvent) message.getBody(); - } catch (ClassCastException cce) { - logger.error("Failed to deserialize message body", cce); - } - - if (event == null) { - logger.error("AsyncEvent type or event is null!"); - return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis()); - } - - final AsyncEvent thisEvent = event; - if (logger.isDebugEnabled()) { - logger.debug("Processing {} event", event); - } - - try { - Observable<IndexOperationMessage> indexoperationObservable; - //merge each operation to a master observable; - if (event instanceof EdgeDeleteEvent) { - indexoperationObservable = handleEdgeDelete(message); - } else if (event instanceof EdgeIndexEvent) { - indexoperationObservable = handleEdgeIndex(message); - } else if (event instanceof EntityDeleteEvent) { - indexoperationObservable = handleEntityDelete(message); - } else if (event instanceof EntityIndexEvent) { - indexoperationObservable = handleEntityIndexUpdate(message); - } else if (event instanceof InitializeApplicationIndexEvent) { - //does not return observable - handleInitializeApplicationIndex(event, message); - indexoperationObservable = Observable.just(new IndexOperationMessage()); - } else { - throw new Exception("Unknown EventType");//TODO: print json instead + List<IndexEventResult> indexEventResults = messages.stream() + .map(message -> { + AsyncEvent event = null; + try { + event = (AsyncEvent) message.getBody(); + } catch (ClassCastException cce) { + logger.error("Failed to deserialize message body", cce); } - //collect all of the - IndexOperationMessage indexOperationMessage = - indexoperationObservable - .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single)) - .toBlocking().lastOrDefault(null); + if (event == null) { + logger.error("AsyncEvent type or event is null!"); + return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis()); + } - if (indexOperationMessage == null || indexOperationMessage.isEmpty()) { - logger.info("Received empty index sequence message:({}), body:({}) ", - message.getMessageId(),message.getStringBody()); + final AsyncEvent thisEvent = event; + if (logger.isDebugEnabled()) { + logger.debug("Processing {} event", event); } - //return type that can be indexed and ack'd later - return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime()); - } catch (Exception e) { - logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody() ,e); - return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime()); - } - }); - //resolve the list and return it. - final List<IndexEventResult> indexEventResults = masterObservable - .collect(() -> new ArrayList<IndexEventResult>(), (list,indexEventResult) -> list.add(indexEventResult) ) - .toBlocking().lastOrDefault(null); + try { + //check for empty sets + boolean validateEmptySets = true; + Observable<IndexOperationMessage> indexoperationObservable; + //merge each operation to a master observable; + if (event instanceof EdgeDeleteEvent) { + indexoperationObservable = handleEdgeDelete(message); + } else if (event instanceof EdgeIndexEvent) { + indexoperationObservable = handleEdgeIndex(message); + } else if (event instanceof EntityDeleteEvent) { + indexoperationObservable = handleEntityDelete(message); + } else if (event instanceof EntityIndexEvent) { + indexoperationObservable = handleEntityIndexUpdate(message); + } else if (event instanceof InitializeApplicationIndexEvent) { + //does not return observable + handleInitializeApplicationIndex(event, message); + indexoperationObservable = Observable.just(new IndexOperationMessage()); + validateEmptySets = false; + } else { + throw new Exception("Unknown EventType");//TODO: print json instead + } + + //collect all of the + IndexOperationMessage indexOperationMessage = + indexoperationObservable + .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single)) + .toBlocking().lastOrDefault(null); + + if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) { + logger.error("Received empty index sequence message:({}), body:({}) ", + message.getMessageId(), message.getStringBody()); + } + + //return type that can be indexed and ack'd later + return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime()); + } catch (Exception e) { + logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody(), e); + return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime()); + } + }) + .collect(Collectors.toList()); return indexEventResults;