Github user mmiklavc commented on a diff in the pull request: https://github.com/apache/metron/pull/1036#discussion_r191585908 --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java --- @@ -115,15 +116,37 @@ public void commit(BulkWriterResponse response) { } public void error(String sensorType, Throwable e, Iterable<Tuple> tuples, MessageGetStrategy messageGetStrategy) { + + if(!Iterables.isEmpty(tuples)) { + LOG.error("Failing tuples; count={}, error={}", Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e)); + } tuples.forEach(t -> collector.ack(t)); MetronError error = new MetronError() .withSensorType(sensorType) .withErrorType(Constants.ErrorType.INDEXING_ERROR) .withThrowable(e); + tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t))); + ErrorUtils.handleError(collector, error); + } + + /** + * Error a set of tuples that may not contain a valid message. + * + * <p>Without a valid message, the source type is unknown. + * <p>Without a valid message, the JSON message cannot be added to the error. + * + * @param e The exception that occurred. + * @param tuples The tuples to error that may not contain valid messages. + */ + public void error(Throwable e, Iterable<Tuple> tuples) { + if(!Iterables.isEmpty(tuples)) { - LOG.error("Failing {} tuples", Iterables.size(tuples), e); + LOG.error("Failing tuples; count={}, error={}", Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e)); } - tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t))); + tuples.forEach(t -> collector.ack(t)); --- End diff -- Wait, @cestella isn't it already acked in line 123?
---