Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/1036#discussion_r191588901 --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java --- @@ -213,60 +214,144 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { + if (isTick(tuple)) { - try { - if (!(bulkMessageWriter instanceof WriterToBulkWriter)) { - //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick. - LOG.debug("Flushing message queues older than their batchTimeouts"); - getWriterComponent().flushTimeouts(bulkMessageWriter, configurationTransformation.apply( - new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())) - , messageGetStrategy); - } - } - catch(Exception e) { - throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e); - } - finally { - collector.ack(tuple); - } - return; + handleTick(tuple); + + } else { + handleMessage(tuple); } + } + + /** + * Handle a tuple containing a message; anything other than a tick tuple. + * + * @param tuple The tuple containing a message. + */ + private void handleMessage(Tuple tuple) { + try { + + JSONObject message = getMessage(tuple); + if(message == null) { + handleMissingMessage(tuple); + return; + } - try - { - JSONObject message = (JSONObject) messageGetStrategy.get(tuple); String sensorType = MessageUtils.getSensorType(message); - LOG.trace("Writing enrichment message: {}", message); - WriterConfiguration writerConfiguration = configurationTransformation.apply( - new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); if(sensorType == null) { - //sensor type somehow ended up being null. We want to error this message directly. - getWriterComponent().error("null" - , new Exception("Sensor type is not specified for message " - + message.toJSONString() - ) - , ImmutableList.of(tuple) - , messageGetStrategy - ); - } - else { - if (writerConfiguration.isDefault(sensorType)) { - //want to warn, but not fail the tuple - collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType)); - } - - getWriterComponent().write(sensorType - , tuple - , message - , bulkMessageWriter - , writerConfiguration - , messageGetStrategy - ); + handleMissingSensorType(tuple, message); + return; } + + writeMessage(tuple, message, sensorType); + + } catch (Exception e) { + throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e); } - catch(Exception e) { + } + + /** + * Handles a tick tuple. + * + * @param tickTuple The tick tuple. + */ + private void handleTick(Tuple tickTuple) { + + try { + if (!(bulkMessageWriter instanceof WriterToBulkWriter)) { + //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick. + LOG.debug("Flushing message queues older than their batchTimeouts"); + getWriterComponent().flushTimeouts(bulkMessageWriter, configurationTransformation.apply( + new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())) + , messageGetStrategy); + } + + } catch(Exception e) { throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e); + + } finally { + collector.ack(tickTuple); + } + } + + /** + * Retrieves the JSON message contained in a tuple. + * + * @param tuple The tuple containing a JSON message. + * @return The JSON message contained in the tuple. If none, returns null. + */ + private JSONObject getMessage(Tuple tuple) { + --- End diff -- Lines 284, 288, 292
---