METRON-1347: Indexing Topology should fail tuples without a source.type (cstella via mmiklavc) closes apache/metron#863
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/bfe90ef1 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/bfe90ef1 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/bfe90ef1 Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual Commit: bfe90ef1e579be53a14d9fd0e4dc19fc6a81baf0 Parents: 53124d9 Author: cstella <ceste...@gmail.com> Authored: Fri Apr 13 11:17:00 2018 -0600 Committer: Michael Miklavcic <michael.miklav...@gmail.com> Committed: Fri Apr 13 11:17:00 2018 -0600 ---------------------------------------------------------------------- .../bolt/BulkMessageWriterBoltTest.java | 25 ++++++++++ metron-platform/metron-indexing/README.md | 6 +++ .../writer/bolt/BulkMessageWriterBolt.java | 51 ++++++++++++++------ 3 files changed, 68 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/bfe90ef1/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java index 308638e..dedf5e6 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java @@ -118,6 +118,31 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { private MessageGetStrategy messageGetStrategy; @Test + public void testSensorTypeMissing() throws Exception { + BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl") + .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) + .withMessageGetterField("message"); + bulkMessageWriterBolt.setCuratorFramework(client); + bulkMessageWriterBolt.setZKCache(cache); + bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType, + new FileInputStream(sampleSensorIndexingConfigPath)); + + bulkMessageWriterBolt.declareOutputFields(declarer); + verify(declarer, times(1)).declareStream(eq("error"), argThat( + new FieldsMatcher("message"))); + Map stormConf = new HashMap(); + bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector); + BulkWriterComponent<JSONObject> component = mock(BulkWriterComponent.class); + bulkMessageWriterBolt.setWriterComponent(component); + verify(bulkMessageWriter, times(1)).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class)); + JSONObject message = (JSONObject) new JSONParser().parse(sampleMessageString); + message.remove("source.type"); + when(tuple.getValueByField("message")).thenReturn(message); + bulkMessageWriterBolt.execute(tuple); + verify(component, times(1)).error(eq("null"), any(), any(), any()); + } + + @Test public void testFlushOnBatchSize() throws Exception { BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl") .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) http://git-wip-us.apache.org/repos/asf/metron/blob/bfe90ef1/metron-platform/metron-indexing/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md index d351d7c..f4a4501 100644 --- a/metron-platform/metron-indexing/README.md +++ b/metron-platform/metron-indexing/README.md @@ -32,6 +32,12 @@ Indices are written in batch and the batch size and batch timeout are specified [Sensor Indexing Configuration](#sensor-indexing-configuration) via the `batchSize` and `batchTimeout` parameters. These configs are variable by sensor type. +## Minimal Assumptions for Message Structure + +At minimum, a message should have a `source.type` field. +Without this field, the message tuple will be failed and not written +with an appropriate error indicated in the Storm UI and logs. + ## Indexing Architecture ![Architecture](indexing_arch.png) http://git-wip-us.apache.org/repos/asf/metron/blob/bfe90ef1/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java index 8202604..b5b97d8 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java @@ -17,6 +17,7 @@ */ package org.apache.metron.writer.bolt; +import com.google.common.collect.ImmutableList; import org.apache.metron.common.Constants; import org.apache.metron.common.bolt.ConfiguredIndexingBolt; import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; @@ -125,6 +126,13 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { return defaultBatchTimeout; } + public BulkWriterComponent<JSONObject> getWriterComponent() { + return writerComponent; + } + + public void setWriterComponent(BulkWriterComponent<JSONObject> component) { + writerComponent = component; + } /** * This method is called by TopologyBuilder.createTopology() to obtain topology and * bolt specific configuration parameters. We use it primarily to configure how often @@ -160,9 +168,11 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { return conf; } + + @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.writerComponent = new BulkWriterComponent<>(collector); + setWriterComponent(new BulkWriterComponent<>(collector)); this.collector = collector; super.prepare(stormConf, context, collector); if (messageGetField != null) { @@ -185,7 +195,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor); defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout(); } - writerComponent.setDefaultBatchTimeout(defaultBatchTimeout); + getWriterComponent().setDefaultBatchTimeout(defaultBatchTimeout); bulkMessageWriter.init(stormConf, context, writerconf); } catch (Exception e) { throw new RuntimeException(e); @@ -197,7 +207,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { */ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, Clock clock) { prepare(stormConf, context, collector); - writerComponent.withClock(clock); + getWriterComponent().withClock(clock); } @SuppressWarnings("unchecked") @@ -208,7 +218,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { if (!(bulkMessageWriter instanceof WriterToBulkWriter)) { //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick. LOG.debug("Flushing message queues older than their batchTimeouts"); - writerComponent.flushTimeouts(bulkMessageWriter, configurationTransformation.apply( + getWriterComponent().flushTimeouts(bulkMessageWriter, configurationTransformation.apply( new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())) , messageGetStrategy); } @@ -229,17 +239,30 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { LOG.trace("Writing enrichment message: {}", message); WriterConfiguration writerConfiguration = configurationTransformation.apply( new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); - if(writerConfiguration.isDefault(sensorType)) { - //want to warn, but not fail the tuple - collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType)); + if(sensorType == null) { + //sensor type somehow ended up being null. We want to error this message directly. + getWriterComponent().error("null" + , new Exception("Sensor type is not specified for message " + + message.toJSONString() + ) + , ImmutableList.of(tuple) + , messageGetStrategy + ); + } + else { + if (writerConfiguration.isDefault(sensorType)) { + //want to warn, but not fail the tuple + collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType)); + } + + getWriterComponent().write(sensorType + , tuple + , message + , bulkMessageWriter + , writerConfiguration + , messageGetStrategy + ); } - writerComponent.write(sensorType - , tuple - , message - , bulkMessageWriter - , writerConfiguration - , messageGetStrategy - ); } catch(Exception e) { throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e);