Github user tnine commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/396#discussion_r41899468
--- Diff:
stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
---
@@ -259,71 +259,71 @@ public void ack(final List<QueueMessage> messages) {
logger.debug("callEventHandlers with {} message",
messages.size());
}
- Observable<IndexEventResult> masterObservable =
Observable.from(messages).map(message -> {
- AsyncEvent event = null;
- try {
- event = (AsyncEvent) message.getBody();
- } catch (ClassCastException cce) {
- logger.error("Failed to deserialize message body", cce);
- }
-
- if (event == null) {
- logger.error("AsyncEvent type or event is null!");
- return new
IndexEventResult(Optional.fromNullable(message),
Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
- }
-
- final AsyncEvent thisEvent = event;
- if (logger.isDebugEnabled()) {
- logger.debug("Processing {} event", event);
- }
-
- try {
- Observable<IndexOperationMessage> indexoperationObservable;
- //merge each operation to a master observable;
- if (event instanceof EdgeDeleteEvent) {
- indexoperationObservable = handleEdgeDelete(message);
- } else if (event instanceof EdgeIndexEvent) {
- indexoperationObservable = handleEdgeIndex(message);
- } else if (event instanceof EntityDeleteEvent) {
- indexoperationObservable = handleEntityDelete(message);
- } else if (event instanceof EntityIndexEvent) {
- indexoperationObservable =
handleEntityIndexUpdate(message);
- } else if (event instanceof
InitializeApplicationIndexEvent) {
- //does not return observable
- handleInitializeApplicationIndex(event, message);
- indexoperationObservable = Observable.just(new
IndexOperationMessage());
- } else {
- throw new Exception("Unknown EventType");//TODO: print
json instead
+ Stream<IndexEventResult> indexEventResults = messages.stream()
+ .map(message -> {
+ AsyncEvent event = null;
+ try {
+ event = (AsyncEvent) message.getBody();
+ } catch (ClassCastException cce) {
+ logger.error("Failed to deserialize message body",
cce);
}
- //collect all of the
- IndexOperationMessage indexOperationMessage =
- indexoperationObservable
- .collect(() -> new IndexOperationMessage(),
(collector, single) -> collector.ingest(single))
- .toBlocking().lastOrDefault(null);
+ if (event == null) {
+ logger.error("AsyncEvent type or event is null!");
+ return new
IndexEventResult(Optional.fromNullable(message),
Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
+ }
- if (indexOperationMessage == null ||
indexOperationMessage.isEmpty()) {
- logger.info("Received empty index sequence
message:({}), body:({}) ",
- message.getMessageId(),message.getStringBody());
+ final AsyncEvent thisEvent = event;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processing {} event", event);
}
- //return type that can be indexed and ack'd later
- return new
IndexEventResult(Optional.fromNullable(message),
Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
- } catch (Exception e) {
- logger.error("Failed to index message: " +
message.getMessageId(), message.getStringBody() ,e);
- return new IndexEventResult(Optional.absent(),
Optional.<IndexOperationMessage>absent(), event.getCreationTime());
- }
- });
- //resolve the list and return it.
- final List<IndexEventResult> indexEventResults = masterObservable
- .collect(() -> new ArrayList<IndexEventResult>(),
(list,indexEventResult) -> list.add(indexEventResult) )
- .toBlocking().lastOrDefault(null);
+ try {
+ //check for empty sets if this is true
+ boolean validateEmptySets = true;
+ Observable<IndexOperationMessage>
indexoperationObservable;
+ //merge each operation to a master observable;
+ if (event instanceof EdgeDeleteEvent) {
+ indexoperationObservable =
handleEdgeDelete(message);
+ } else if (event instanceof EdgeIndexEvent) {
+ indexoperationObservable =
handleEdgeIndex(message);
+ } else if (event instanceof EntityDeleteEvent) {
+ indexoperationObservable =
handleEntityDelete(message);
+ } else if (event instanceof EntityIndexEvent) {
+ indexoperationObservable =
handleEntityIndexUpdate(message);
+ } else if (event instanceof
InitializeApplicationIndexEvent) {
+ //does not return observable
+ handleInitializeApplicationIndex(event, message);
+ indexoperationObservable = Observable.just(new
IndexOperationMessage());
+ validateEmptySets = false; //do not check this one
for an empty set b/c it will be empty.
+ } else {
+ throw new Exception("Unknown EventType");//TODO:
print json instead
+ }
+
+ //collect all of the
+ IndexOperationMessage indexOperationMessage =
+ indexoperationObservable
+ .collect(() -> new IndexOperationMessage(),
(collector, single) -> collector.ingest(single))
+ .toBlocking().lastOrDefault(null);
+
+ if (validateEmptySets && (indexOperationMessage ==
null || indexOperationMessage.isEmpty())) {
+ logger.error("Received empty index sequence
message:({}), body:({}) ",
+ message.getMessageId(),
message.getStringBody());
+ throw new Exception("Received empty index
sequence.");
--- End diff --
Won't throwing an exception here cause the entire stream to blow up, not
acking messages that should work and be processed?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---