This is an automated email from the ASF dual-hosted git repository.

rmerriman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f2c802  METRON-2026 Remove Storm dependency from metron-common 
(merrimanr) closes apache/metron#1351
2f2c802 is described below

commit 2f2c80244b34e8182f72e6f4dc98a2940194eaa0
Author: merrimanr <merrim...@gmail.com>
AuthorDate: Mon Apr 15 16:40:51 2019 -0500

    METRON-2026 Remove Storm dependency from metron-common (merrimanr) closes 
apache/metron#1351
---
 .../metron/profiler/storm/ProfileSplitterBolt.java |  2 +-
 .../metron-common-storm/pom.xml                    | 89 ++++++++++++++++++++++
 .../metron/storm}/common/bolt/ConfiguredBolt.java  |  2 +-
 .../common/bolt/ConfiguredEnrichmentBolt.java      |  2 +-
 .../storm}/common/bolt/ConfiguredIndexingBolt.java |  2 +-
 .../storm}/common/bolt/ConfiguredParserBolt.java   |  2 +-
 .../storm}/common/bolt/ConfiguredProfilerBolt.java |  2 +-
 .../storm}/common/message/BytesFromPosition.java   |  2 +-
 .../storm}/common/message/JSONFromField.java       |  2 +-
 .../common/message/JSONFromFieldByReference.java   |  2 +-
 .../storm}/common/message/JSONFromPosition.java    |  2 +-
 .../storm}/common/message/MessageGetStrategy.java  |  2 +-
 .../storm}/common/message/MessageGetters.java      |  2 +-
 .../storm}/common/message/ObjectFromField.java     |  2 +-
 .../common/message/metadata/RawMessageUtil.java}   | 65 ++++++----------
 .../storm/common/utils/StormErrorUtils.java}       | 32 +++++---
 .../storm}/common/bolt/BaseConfiguredBoltTest.java |  2 +-
 .../common/bolt/ConfiguredEnrichmentBoltTest.java  | 16 ++--
 .../common/bolt/ConfiguredParserBoltTest.java      | 13 ++--
 .../storm}/common/message/MessageGettersTest.java  |  2 +-
 .../message/metadata/RawMessageUtilTest.java       |  9 ++-
 .../metron/storm}/common/utils/ErrorUtilsTest.java |  5 +-
 metron-platform/metron-common-streaming/pom.xml    | 33 ++++++++
 metron-platform/metron-common/pom.xml              | 75 ++----------------
 .../apache/metron/common/error/MetronError.java    |  4 +-
 .../common/message/metadata/MetadataUtil.java      | 53 -------------
 .../common/message/metadata/RawMessageUtil.java    | 59 --------------
 .../org/apache/metron/common/utils/ErrorUtils.java | 20 -----
 .../metron/common/writer/BulkMessageWriter.java    |  3 +-
 .../elasticsearch/writer/ElasticsearchWriter.java  |  3 +-
 .../writer/ElasticsearchWriterTest.java            | 18 ++---
 .../metron/enrichment/bolt/EnrichmentJoinBolt.java |  2 +-
 .../enrichment/bolt/GenericEnrichmentBolt.java     | 10 +--
 .../apache/metron/enrichment/bolt/JoinBolt.java    | 10 +--
 .../apache/metron/enrichment/bolt/SplitBolt.java   |  2 +-
 .../enrichment/bolt/ThreatIntelJoinBolt.java       |  2 +-
 .../enrichment/bolt/UnifiedEnrichmentBolt.java     | 12 +--
 .../enrichment/bolt/EnrichmentJoinBoltTest.java    |  2 +-
 .../metron/enrichment/bolt/JoinBoltTest.java       |  2 +-
 .../enrichment/bolt/ThreatIntelJoinBoltTest.java   |  2 +-
 .../metron-parsing/metron-parsing-storm/pom.xml    |  2 +-
 .../org/apache/metron/parsers/bolt/ParserBolt.java | 14 ++--
 .../org/apache/metron/parsers/bolt/WriterBolt.java |  8 +-
 .../apache/metron/parsers/bolt/WriterHandler.java  |  4 +-
 .../apache/metron/parsers/bolt/ParserBoltTest.java |  2 +-
 .../apache/metron/parsers/bolt/WriterBoltTest.java |  8 +-
 .../integration/validation/StormParserDriver.java  |  2 +-
 .../org/apache/metron/solr/writer/SolrWriter.java  |  3 +-
 .../schema/SchemaValidationIntegrationTest.java    |  2 +-
 .../apache/metron/solr/writer/SolrWriterTest.java  |  4 +-
 metron-platform/metron-writer/pom.xml              |  2 +-
 .../org/apache/metron/writer/AckTuplesPolicy.java  |  2 +-
 .../java/org/apache/metron/writer/NoopWriter.java  |  3 +-
 .../apache/metron/writer/WriterToBulkWriter.java   |  3 +-
 .../metron/writer/bolt/BulkMessageWriterBolt.java  | 16 ++--
 .../writer/hbase/SimpleHbaseEnrichmentWriter.java  |  3 +-
 .../org/apache/metron/writer/hdfs/HdfsWriter.java  |  7 +-
 .../apache/metron/writer/kafka/KafkaWriter.java    |  3 +-
 .../apache/metron/writer/AckTuplesPolicyTest.java  |  2 +-
 .../metron/writer/BulkWriterComponentTest.java     |  1 -
 .../writer/bolt/BulkMessageWriterBoltTest.java     | 12 ++-
 .../apache/metron/writer/hdfs/HdfsWriterTest.java  | 51 ++++++++-----
 metron-platform/pom.xml                            |  1 +
 63 files changed, 331 insertions(+), 395 deletions(-)

diff --git 
a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
 
b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
index ef58ad9..2161910 100644
--- 
a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
+++ 
b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
@@ -20,7 +20,7 @@
 
 package org.apache.metron.profiler.storm;
 
-import org.apache.metron.common.bolt.ConfiguredProfilerBolt;
+import org.apache.metron.storm.common.bolt.ConfiguredProfilerBolt;
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
 import org.apache.metron.profiler.DefaultMessageRouter;
 import org.apache.metron.profiler.MessageRoute;
diff --git 
a/metron-platform/metron-common-streaming/metron-common-storm/pom.xml 
b/metron-platform/metron-common-streaming/metron-common-storm/pom.xml
new file mode 100644
index 0000000..a063163
--- /dev/null
+++ b/metron-platform/metron-common-streaming/metron-common-storm/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software
+       Foundation (ASF) under one or more contributor license agreements. See 
the
+       NOTICE file distributed with this work for additional information 
regarding
+       copyright ownership. The ASF licenses this file to You under the Apache 
License,
+       Version 2.0 (the "License"); you may not use this file except in 
compliance
+       with the License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0
+       Unless required by applicable law or agreed to in writing, software 
distributed
+       under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+       OR CONDITIONS OF ANY KIND, either express or implied. See the License 
for
+  the specific language governing permissions and limitations under the 
License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-common-streaming</artifactId>
+        <version>0.7.1</version>
+    </parent>
+    <artifactId>metron-common-storm</artifactId>
+    <name>metron-common-storm</name>
+    <description>Components common to Storm</description>
+    <url>https://metron.apache.org/</url>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <commons.config.version>1.10</commons.config.version>
+    </properties>
+    <dependencies>
+      <dependency>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-common</artifactId>
+        <version>${project.parent.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.storm</groupId>
+        <artifactId>storm-core</artifactId>
+        <version>${global_storm_version}</version>
+        <scope>provided</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+          </exclusion>
+          <exclusion>
+            <artifactId>servlet-api</artifactId>
+            <groupId>javax.servlet</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <groupId>org.slf4j</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <groupId>org.apache.logging.log4j</groupId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.storm</groupId>
+        <artifactId>flux-core</artifactId>
+        <version>${global_flux_version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-client</artifactId>
+        <version>${global_curator_version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-test</artifactId>
+        <version>${global_curator_version}</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-test-utilities</artifactId>
+        <version>${project.parent.version}</version>
+        <scope>test</scope>
+      </dependency>
+    </dependencies>
+</project>
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredBolt.java
similarity index 99%
rename from 
metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredBolt.java
index 221edad..a437574 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredBolt.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
 
 import java.lang.invoke.MethodHandles;
 import java.util.Map;
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredEnrichmentBolt.java
similarity index 97%
rename from 
metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredEnrichmentBolt.java
index 2e03a36..bde85bb 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredEnrichmentBolt.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredIndexingBolt.java
similarity index 96%
rename from 
metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredIndexingBolt.java
index 27e081e..2d650f0 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredIndexingBolt.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
 
 import java.lang.invoke.MethodHandles;
 import org.apache.metron.common.configuration.IndexingConfigurations;
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredParserBolt.java
similarity index 97%
rename from 
metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredParserBolt.java
index 17b614b..5074271 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredParserBolt.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredProfilerBolt.java
similarity index 97%
rename from 
metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredProfilerBolt.java
index e4d9f7b..b4207c2 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredProfilerBolt.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
 
 
 import java.lang.invoke.MethodHandles;
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/BytesFromPosition.java
similarity index 96%
rename from 
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/BytesFromPosition.java
index 56c6490..b47f43c 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/BytesFromPosition.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
 
 import org.apache.storm.tuple.Tuple;
 
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromField.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromField.java
similarity index 96%
rename from 
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromField.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromField.java
index 39fe9dd..b8df105 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromField.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromField.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
 
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromFieldByReference.java
similarity index 96%
rename from 
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromFieldByReference.java
index a0d4b7d..1868bc7 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromFieldByReference.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
 
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromPosition.java
similarity index 97%
rename from 
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromPosition.java
index 15f0447..b81373e 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromPosition.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
 
 import org.apache.commons.io.Charsets;
 import org.apache.storm.tuple.Tuple;
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetStrategy.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/MessageGetStrategy.java
similarity index 95%
rename from 
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetStrategy.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/MessageGetStrategy.java
index 0595ce1..dfe8fff 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetStrategy.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/MessageGetStrategy.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
 
 import org.apache.storm.tuple.Tuple;
 
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/MessageGetters.java
similarity index 98%
rename from 
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/MessageGetters.java
index 46bb406..36ad1c9 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/MessageGetters.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
 
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/ObjectFromField.java
similarity index 96%
copy from 
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
copy to 
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/ObjectFromField.java
index 120c09c..dced7d6 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/ObjectFromField.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
 
 import org.apache.storm.tuple.Tuple;
 
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/metadata/RawMessageUtil.java
similarity index 67%
copy from 
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
copy to 
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/metadata/RawMessageUtil.java
index 3034ddd..afd3a87 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/metadata/RawMessageUtil.java
@@ -15,9 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.message.metadata;
+package org.apache.metron.storm.common.message.metadata;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.message.metadata.MetadataUtil;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.common.message.metadata.RawMessageStrategy;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
@@ -29,51 +32,29 @@ import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Map;
 
-/**
- * Captures some common utility methods around metadata manipulation.
- */
-public enum MetadataUtil {
+public enum RawMessageUtil {
+
   INSTANCE;
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  /**
-   * The default metadata prefix.
-   */
-  public static final String METADATA_PREFIX = "metron.metadata";
-  /**
-   * The config key for defining the prefix.
-   */
-  public static final String METADATA_PREFIX_CONFIG = "metadataPrefix";
-  static final int KEY_INDEX = 1;
 
-  /**
-   * Return the prefix that we want to use for metadata keys.  This comes from 
the config and is defaulted to
-   * 'metron.metadata'.
-   *
-   * @param config The rawMessageStrategyConfig
-   * @return the prefix for metadata keys
-   */
-  public String getMetadataPrefix(Map<String, Object> config) {
-    String prefix = (String) config.getOrDefault(METADATA_PREFIX_CONFIG, 
METADATA_PREFIX);
-    if(StringUtils.isEmpty(prefix)) {
-      return null;
-    }
-    return prefix;
-  }
+  static final int KEY_INDEX = 1;
 
   /**
-   * Take a field and prefix it with the metadata key.
-   *
-   * @param prefix The metadata prefix to use (e.g. 'foo')
-   * @param key The key name (e.g. my_field)
-   * @return The prefixed key separated by a . (e.g. foo.my_field)
+   * Extract the raw message given the strategy, the tuple and the metadata 
configs.
+   * @param strategy The {@link RawMessageStrategy} to use for extraction
+   * @param t The tuple to pull the message from
+   * @param rawMessage The raw message in bytes
+   * @param readMetadata True if read metadata, false otherwise
+   * @param config The config to use during extraction
+   * @return The resulting {@link RawMessage}
    */
-  public String prefixKey(String prefix, String key) {
-    if(StringUtils.isEmpty(prefix)) {
-      return key;
-    }
-    else {
-      return prefix + "." + key;
+  public RawMessage getRawMessage(RawMessageStrategy strategy, Tuple t, byte[] 
rawMessage, boolean readMetadata, Map<String, Object> config) {
+    Map<String, Object> metadata = new HashMap<>();
+    if(readMetadata) {
+      String prefix = MetadataUtil.INSTANCE.getMetadataPrefix(config);
+      metadata = extractMetadata(prefix, t);
     }
+    return strategy.get(metadata, rawMessage, readMetadata, config);
   }
 
   /**
@@ -101,7 +82,7 @@ public enum MetadataUtil {
       String envMetadataFieldName = tupleFields.get(i);
       Object envMetadataFieldValue = t.getValue(i);
       if (!StringUtils.isEmpty(envMetadataFieldName) && envMetadataFieldValue 
!= null) {
-        metadata.put(prefixKey(prefix, envMetadataFieldName), 
envMetadataFieldValue);
+        metadata.put(MetadataUtil.INSTANCE.prefixKey(prefix, 
envMetadataFieldName), envMetadataFieldValue);
       }
     }
     byte[] keyObj = t.getBinary(KEY_INDEX);
@@ -111,7 +92,7 @@ public enum MetadataUtil {
       if (!StringUtils.isEmpty(keyStr)) {
         Map<String, Object> rawMetadata = JSONUtils.INSTANCE.load(keyStr, 
JSONUtils.MAP_SUPPLIER);
         for (Map.Entry<String, Object> kv : rawMetadata.entrySet()) {
-          metadata.put(prefixKey(prefix, kv.getKey()), kv.getValue());
+          metadata.put(MetadataUtil.INSTANCE.prefixKey(prefix, kv.getKey()), 
kv.getValue());
         }
 
       }
@@ -122,4 +103,6 @@ public enum MetadataUtil {
     }
     return metadata;
   }
+
+
 }
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/utils/StormErrorUtils.java
similarity index 51%
rename from 
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/utils/StormErrorUtils.java
index 120c09c..cd7f93d 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/utils/StormErrorUtils.java
@@ -15,22 +15,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.message;
 
-import org.apache.storm.tuple.Tuple;
+package org.apache.metron.storm.common.utils;
 
-public class ObjectFromField implements MessageGetStrategy {
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.error.MetronError;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Values;
 
-  private String fieldValue = "message";
+import java.util.Optional;
 
-  public ObjectFromField() {};
+public class StormErrorUtils {
+  /**
+   * Handles a {@link MetronError} that occurs.
+   *
+   * @param collector The Storm output collector being reported to
+   * @param error The error that occurred
+   */
+  public static void handleError(OutputCollector collector, MetronError error)
+  {
+    collector.emit(Constants.ERROR_STREAM, new Values(error.getJSONObject()));
+    Optional<Throwable> throwable = error.getThrowable();
+    if (throwable.isPresent()) {
+      collector.reportError(throwable.get());
+    }
 
-  public ObjectFromField(String fieldValue) {
-    this.fieldValue = fieldValue;
-  }
-
-  @Override
-  public Object get(Tuple tuple) {
-    return tuple.getValueByField(fieldValue);
   }
 }
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/BaseConfiguredBoltTest.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/BaseConfiguredBoltTest.java
similarity index 97%
rename from 
metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/BaseConfiguredBoltTest.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/BaseConfiguredBoltTest.java
index f9901cd..ac2d1e8 100644
--- 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/BaseConfiguredBoltTest.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/BaseConfiguredBoltTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
 
 import org.apache.metron.test.bolt.BaseBoltTest;
 import org.junit.Assert;
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/ConfiguredEnrichmentBoltTest.java
similarity index 91%
rename from 
metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/ConfiguredEnrichmentBoltTest.java
index 44612cd..256415a 100644
--- 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/ConfiguredEnrichmentBoltTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
 
 import org.apache.log4j.Level;
 import org.apache.metron.test.utils.UnitTestHelper;
@@ -37,6 +37,10 @@ import java.util.Set;
 
 public class ConfiguredEnrichmentBoltTest extends BaseConfiguredBoltTest {
 
+  private static final String sampleConfigPath = "../" + 
TestConstants.SAMPLE_CONFIG_PATH;
+  private static final String enrichmentsConfigPath = "../" + 
TestConstants.ENRICHMENTS_CONFIGS_PATH;
+  private static final String parserConfigsPath = "../" + 
TestConstants.PARSER_CONFIGS_PATH;
+
   private Set<String> enrichmentConfigurationTypes = new HashSet<>();
   private String zookeeperUrl;
 
@@ -64,15 +68,15 @@ public class ConfiguredEnrichmentBoltTest extends 
BaseConfiguredBoltTest {
   public void setupConfiguration() throws Exception {
     TestingServer testZkServer = new TestingServer(true);
     this.zookeeperUrl = testZkServer.getConnectString();
-    byte[] globalConfig = 
ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+    byte[] globalConfig = 
ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigPath);
     ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig, 
zookeeperUrl);
     enrichmentConfigurationTypes.add(ConfigurationType.GLOBAL.getTypeName());
-    Map<String, byte[]> sensorEnrichmentConfigs = 
ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.ENRICHMENTS_CONFIGS_PATH);
+    Map<String, byte[]> sensorEnrichmentConfigs = 
ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(enrichmentsConfigPath);
     for (String sensorType : sensorEnrichmentConfigs.keySet()) {
       ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, 
sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
       enrichmentConfigurationTypes.add(sensorType);
     }
-    Map<String, byte[]> sensorParserConfigs = 
ConfigurationsUtils.readSensorParserConfigsFromFile(TestConstants.PARSER_CONFIGS_PATH);
+    Map<String, byte[]> sensorParserConfigs = 
ConfigurationsUtils.readSensorParserConfigsFromFile(parserConfigsPath);
     for (String sensorType : sensorParserConfigs.keySet()) {
       ConfigurationsUtils.writeSensorParserConfigToZookeeper(sensorType, 
sensorParserConfigs.get(sensorType), zookeeperUrl);
     }
@@ -90,8 +94,8 @@ public class ConfiguredEnrichmentBoltTest extends 
BaseConfiguredBoltTest {
     UnitTestHelper.setLog4jLevel(ConfiguredBolt.class, Level.ERROR);
 
     configsUpdated = new HashSet<>();
-    
sampleConfigurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH));
-    Map<String, byte[]> sensorEnrichmentConfigs = 
ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.ENRICHMENTS_CONFIGS_PATH);
+    
sampleConfigurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigPath));
+    Map<String, byte[]> sensorEnrichmentConfigs = 
ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(enrichmentsConfigPath);
     for (String sensorType : sensorEnrichmentConfigs.keySet()) {
       sampleConfigurations.updateSensorEnrichmentConfig(sensorType, 
sensorEnrichmentConfigs.get(sensorType));
     }
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/ConfiguredParserBoltTest.java
similarity index 94%
rename from 
metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/ConfiguredParserBoltTest.java
index 3deba78..66668d1 100644
--- 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/ConfiguredParserBoltTest.java
@@ -15,11 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
 
 import org.apache.log4j.Level;
 import org.apache.metron.common.configuration.FieldValidator;
-import org.apache.metron.common.field.validation.FieldValidation;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
@@ -67,14 +66,14 @@ public class ConfiguredParserBoltTest extends 
BaseConfiguredBoltTest {
   public void setupConfiguration() throws Exception {
     TestingServer testZkServer = new TestingServer(true);
     this.zookeeperUrl = testZkServer.getConnectString();
-    byte[] globalConfig = 
ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+    byte[] globalConfig = ConfigurationsUtils.readGlobalConfigFromFile("../" + 
TestConstants.SAMPLE_CONFIG_PATH);
     ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig, 
zookeeperUrl);
     parserConfigurationTypes.add(ConfigurationType.GLOBAL.getTypeName());
-    Map<String, byte[]> sensorEnrichmentConfigs = 
ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.ENRICHMENTS_CONFIGS_PATH);
+    Map<String, byte[]> sensorEnrichmentConfigs = 
ConfigurationsUtils.readSensorEnrichmentConfigsFromFile("../" + 
TestConstants.ENRICHMENTS_CONFIGS_PATH);
     for (String sensorType : sensorEnrichmentConfigs.keySet()) {
       ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, 
sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
     }
-    Map<String, byte[]> sensorParserConfigs = 
ConfigurationsUtils.readSensorParserConfigsFromFile(TestConstants.PARSER_CONFIGS_PATH);
+    Map<String, byte[]> sensorParserConfigs = 
ConfigurationsUtils.readSensorParserConfigsFromFile("../" + 
TestConstants.PARSER_CONFIGS_PATH);
     for (String sensorType : sensorParserConfigs.keySet()) {
       ConfigurationsUtils.writeSensorParserConfigToZookeeper(sensorType, 
sensorParserConfigs.get(sensorType), zookeeperUrl);
       parserConfigurationTypes.add(sensorType);
@@ -93,8 +92,8 @@ public class ConfiguredParserBoltTest extends 
BaseConfiguredBoltTest {
     UnitTestHelper.setLog4jLevel(ConfiguredBolt.class, Level.ERROR);
 
     configsUpdated = new HashSet<>();
-    
sampleConfigurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH));
-    Map<String, byte[]> sensorParserConfigs = 
ConfigurationsUtils.readSensorParserConfigsFromFile(TestConstants.PARSER_CONFIGS_PATH);
+    
sampleConfigurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile("../"
 + TestConstants.SAMPLE_CONFIG_PATH));
+    Map<String, byte[]> sensorParserConfigs = 
ConfigurationsUtils.readSensorParserConfigsFromFile("../" + 
TestConstants.PARSER_CONFIGS_PATH);
     for (String sensorType : sensorParserConfigs.keySet()) {
       sampleConfigurations.updateSensorParserConfig(sensorType, 
sensorParserConfigs.get(sensorType));
     }
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/MessageGettersTest.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/message/MessageGettersTest.java
similarity index 99%
rename from 
metron-platform/metron-common/src/test/java/org/apache/metron/common/message/MessageGettersTest.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/message/MessageGettersTest.java
index ea7583a..aa3c0a0 100644
--- 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/MessageGettersTest.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/message/MessageGettersTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
 
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/metadata/RawMessageUtilTest.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/message/metadata/RawMessageUtilTest.java
similarity index 96%
rename from 
metron-platform/metron-common/src/test/java/org/apache/metron/common/message/metadata/RawMessageUtilTest.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/message/metadata/RawMessageUtilTest.java
index e5fd80f..cf51f02 100644
--- 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/metadata/RawMessageUtilTest.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/message/metadata/RawMessageUtilTest.java
@@ -15,18 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.message.metadata;
+package org.apache.metron.storm.common.message.metadata;
 
-import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.Constants;
+import org.apache.metron.common.message.metadata.EnvelopedRawMessageStrategy;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.common.message.metadata.RawMessageStrategies;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.metron.common.message.metadata.MetadataUtil;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -54,7 +57,7 @@ public class RawMessageUtilTest {
     }
 
     when(t.getFields()).thenReturn(f);
-    
when(t.getBinary(eq(MetadataUtil.KEY_INDEX))).thenReturn(metadata.getBytes());
+    
when(t.getBinary(eq(RawMessageUtil.KEY_INDEX))).thenReturn(metadata.getBytes());
     return t;
   }
 
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
 
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/utils/ErrorUtilsTest.java
similarity index 95%
rename from 
metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
rename to 
metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/utils/ErrorUtilsTest.java
index 77ea9da..037a06c 100644
--- 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
+++ 
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/utils/ErrorUtilsTest.java
@@ -15,10 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.utils;
+package org.apache.metron.storm.common.utils;
 
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.test.error.MetronErrorJSONMatcher;
 import org.apache.storm.task.OutputCollector;
 import org.junit.Rule;
@@ -79,7 +80,7 @@ public class ErrorUtilsTest {
     MetronError error = new MetronError().withMessage("error 
message").withThrowable(e);
     OutputCollector collector = mock(OutputCollector.class);
 
-    ErrorUtils.handleError(collector, error);
+    StormErrorUtils.handleError(collector, error);
     verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new 
MetronErrorJSONMatcher(error.getJSONObject())));
     verify(collector, times(1)).reportError(any());
   }
diff --git a/metron-platform/metron-common-streaming/pom.xml 
b/metron-platform/metron-common-streaming/pom.xml
new file mode 100644
index 0000000..4746b9e
--- /dev/null
+++ b/metron-platform/metron-common-streaming/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>metron-common-streaming</artifactId>
+  <packaging>pom</packaging>
+  <name>metron-common-streaming</name>
+  <parent>
+    <artifactId>metron-platform</artifactId>
+    <groupId>org.apache.metron</groupId>
+    <version>0.7.1</version>
+  </parent>
+  <description>Common modules for Streaming platforms</description>
+  <url>https://metron.apache.org/</url>
+  <scm>
+    
<connection>scm:git:https://gitbox.apache.org/repos/asf/metron.git</connection>
+    
<developerConnection>scm:git:https://gitbox.apache.org/repos/asf/metron.git</developerConnection>
+    <tag>HEAD</tag>
+    <url>https://gitbox.apache.org/repos/asf/metron.git</url>
+  </scm>
+
+  <licenses>
+    <license>
+      <name>The Apache Software License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+    </license>
+  </licenses>
+  <modules>
+    <module>metron-common-storm</module>
+  </modules>
+</project>
\ No newline at end of file
diff --git a/metron-platform/metron-common/pom.xml 
b/metron-platform/metron-common/pom.xml
index 4735044..6835933 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -29,6 +29,7 @@
         
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <commons.config.version>1.10</commons.config.version>
         <guava_version>${global_guava_version}</guava_version>
+        <asm.version>5.0.3</asm.version>
     </properties>
     <repositories>
         <repository>
@@ -85,34 +86,6 @@
             <version>${global_antlr_version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${global_storm_version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>servlet-api</artifactId>
-                    <groupId>javax.servlet</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>log4j-over-slf4j</artifactId>
-                    <groupId>org.slf4j</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>log4j-slf4j-impl</artifactId>
-                    <groupId>org.apache.logging.log4j</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.10</artifactId>
             <version>${global_kafka_version}</version>
@@ -218,18 +191,6 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-common</artifactId>
-            <version>${global_hbase_version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-mapreduce-client-core</artifactId>
-                </exclusion>
-            </exclusions>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-auth</artifactId>
             <version>${global_hadoop_version}</version>
@@ -257,33 +218,9 @@
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-client</artifactId>
-            <version>${global_hbase_version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-auth</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-common</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
                 <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
+                    <groupId>asm</groupId>
+                    <artifactId>asm</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
@@ -303,9 +240,9 @@
             <version>${global_jackson_version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>flux-core</artifactId>
-            <version>${global_flux_version}</version>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm</artifactId>
+            <version>${asm.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.curator</groupId>
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
index df21c3b..46fe3b2 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
@@ -23,6 +23,7 @@ import static org.apache.metron.common.Constants.ErrorFields;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -34,7 +35,6 @@ import java.util.Set;
 import java.util.UUID;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.Constants.ErrorType;
 import org.apache.metron.common.utils.HashUtils;
@@ -180,7 +180,7 @@ public class MetronError {
         // It's unclear if we need a rawMessageBytes field so commenting out 
for now
         //String rawMessageBytesField = rawMessages.size() == 1 ? 
ErrorFields.RAW_MESSAGE_BYTES.getName() : 
ErrorFields.RAW_MESSAGE_BYTES.getName() + "_" + i;
         if(rawMessage instanceof byte[]) {
-          errorMessage.put(rawMessageField, 
Bytes.toString((byte[])rawMessage));
+          errorMessage.put(rawMessageField, new String((byte[])rawMessage, 
Charset.forName("UTF-8")));
           //errorMessage.put(rawMessageBytesField, 
com.google.common.primitives.Bytes.asList((byte[])rawMessage));
         } else if (rawMessage instanceof JSONObject) {
           JSONObject rawMessageJSON = (JSONObject) rawMessage;
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
index 3034ddd..628ef50 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
@@ -18,15 +18,10 @@
 package org.apache.metron.common.message.metadata;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -43,7 +38,6 @@ public enum MetadataUtil {
    * The config key for defining the prefix.
    */
   public static final String METADATA_PREFIX_CONFIG = "metadataPrefix";
-  static final int KEY_INDEX = 1;
 
   /**
    * Return the prefix that we want to use for metadata keys.  This comes from 
the config and is defaulted to
@@ -75,51 +69,4 @@ public enum MetadataUtil {
       return prefix + "." + key;
     }
   }
-
-  /**
-   * Default extraction of metadata.  This handles looking in the normal 
places for metadata
-   * <ul>
-   *   <li>The kafka key</li>
-   *   <li>The tuple fields outside of the value (e.g. the topic)</li>
-   * </ul>
-   *
-   * <p>In addition to extracting the metadata into a map, it applies the 
appropriate prefix (as configured in the rawMessageStrategyConfig).
-   * @param prefix The prefix of the metadata keys
-   * @param t The tuple to get metadata from
-   * @return A map containing the metadata
-   */
-  public Map<String, Object> extractMetadata(String prefix, Tuple t) {
-    Map<String, Object> metadata = new HashMap<>();
-    if(t == null) {
-      return metadata;
-    }
-    Fields tupleFields = t.getFields();
-    if(tupleFields == null) {
-      return metadata;
-    }
-    for (int i = 2; i < tupleFields.size(); ++i) {
-      String envMetadataFieldName = tupleFields.get(i);
-      Object envMetadataFieldValue = t.getValue(i);
-      if (!StringUtils.isEmpty(envMetadataFieldName) && envMetadataFieldValue 
!= null) {
-        metadata.put(prefixKey(prefix, envMetadataFieldName), 
envMetadataFieldValue);
-      }
-    }
-    byte[] keyObj = t.getBinary(KEY_INDEX);
-    String keyStr = null;
-    try {
-      keyStr = keyObj == null ? null : new String(keyObj);
-      if (!StringUtils.isEmpty(keyStr)) {
-        Map<String, Object> rawMetadata = JSONUtils.INSTANCE.load(keyStr, 
JSONUtils.MAP_SUPPLIER);
-        for (Map.Entry<String, Object> kv : rawMetadata.entrySet()) {
-          metadata.put(prefixKey(prefix, kv.getKey()), kv.getValue());
-        }
-
-      }
-    } catch (IOException e) {
-      String reason = "Unable to parse metadata; expected JSON Map: " + 
(keyStr == null ? "NON-STRING!" : keyStr);
-      LOG.error(reason, e);
-      throw new IllegalStateException(reason, e);
-    }
-    return metadata;
-  }
 }
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageUtil.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageUtil.java
deleted file mode 100644
index 3bd9915..0000000
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageUtil.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.common.message.metadata;
-
-import com.google.common.base.Joiner;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.Map;
-
-public enum RawMessageUtil {
-
-  INSTANCE;
-
-
-  /**
-   * Extract the raw message given the strategy, the tuple and the metadata 
configs.
-   * @param strategy The {@link RawMessageStrategy} to use for extraction
-   * @param t The tuple to pull the message from
-   * @param rawMessage The raw message in bytes
-   * @param readMetadata True if read metadata, false otherwise
-   * @param config The config to use during extraction
-   * @return The resulting {@link RawMessage}
-   */
-  public RawMessage getRawMessage(RawMessageStrategy strategy, Tuple t, byte[] 
rawMessage, boolean readMetadata, Map<String, Object> config) {
-    Map<String, Object> metadata = new HashMap<>();
-    if(readMetadata) {
-      String prefix = MetadataUtil.INSTANCE.getMetadataPrefix(config);
-      metadata = MetadataUtil.INSTANCE.extractMetadata(prefix, t);
-    }
-    return strategy.get(metadata, rawMessage, readMetadata, config);
-  }
-
-
-}
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
index e3ad306..bd666c3 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
@@ -26,10 +26,6 @@ import java.lang.management.ThreadMXBean;
 import java.util.Optional;
 import java.util.function.Function;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.error.MetronError;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Values;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,22 +86,6 @@ public class ErrorUtils {
   }
 
   /**
-   * Handles a {@link MetronError} that occurs.
-   *
-   * @param collector The Storm output collector being reported to
-   * @param error The error that occurred
-   */
-  public static void handleError(OutputCollector collector, MetronError error)
-  {
-    collector.emit(Constants.ERROR_STREAM, new Values(error.getJSONObject()));
-    Optional<Throwable> throwable = error.getThrowable();
-    if (throwable.isPresent()) {
-      collector.reportError(throwable.get());
-    }
-
-  }
-
-  /**
    * Generates a string version of a thread dump.
    *
    * @return String of the thread dump
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
index 7f92d37..51bc599 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.metron.common.writer;
 
-import org.apache.storm.task.TopologyContext;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 
 import java.io.Serializable;
@@ -26,7 +25,7 @@ import java.util.Map;
 
 public interface BulkMessageWriter<MESSAGE_T> extends AutoCloseable, 
Serializable {
 
-  void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration config) throws Exception;
+  void init(Map stormConf, WriterConfiguration config) throws Exception;
 
   /**
   * Writes the messages to a particular output (e.g. Elasticsearch).  A 
response is returned with successful and failed message ids.
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index 7d14c11..b7814b6 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -33,7 +33,6 @@ import 
org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
-import org.apache.storm.task.TopologyContext;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,7 +74,7 @@ public class ElasticsearchWriter implements 
BulkMessageWriter<JSONObject>, Seria
   private SimpleDateFormat dateFormat;
 
   @Override
-  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration configurations) {
+  public void init(Map stormConf, WriterConfiguration configurations) {
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
     dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
 
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
index 3d6a3fa..ba5cfe0 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
@@ -25,7 +25,6 @@ import org.apache.metron.common.writer.BulkWriterResponse;
 import org.apache.metron.common.writer.MessageId;
 import org.apache.metron.elasticsearch.bulk.BulkDocumentWriter;
 import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
-import org.apache.storm.task.TopologyContext;
 import org.json.simple.JSONObject;
 import org.junit.Before;
 import org.junit.Test;
@@ -47,13 +46,10 @@ import static org.mockito.Mockito.when;
 public class ElasticsearchWriterTest {
 
     Map stormConf;
-    TopologyContext topologyContext;
     WriterConfiguration writerConfiguration;
 
     @Before
     public void setup() {
-        topologyContext = mock(TopologyContext.class);
-
         writerConfiguration = mock(WriterConfiguration.class);
         when(writerConfiguration.getGlobalConfig()).thenReturn(globals());
 
@@ -74,7 +70,7 @@ public class ElasticsearchWriterTest {
         // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
         esWriter.setDocumentWriter(docWriter);
-        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        esWriter.init(stormConf, writerConfiguration);
         BulkWriterResponse response = esWriter.write("bro", 
writerConfiguration, messages);
 
         // response should only contain successes
@@ -98,7 +94,7 @@ public class ElasticsearchWriterTest {
         // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
         esWriter.setDocumentWriter(docWriter);
-        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        esWriter.init(stormConf, writerConfiguration);
         BulkWriterResponse response = esWriter.write("bro", 
writerConfiguration, messages);
 
         // response should only contain successes
@@ -123,7 +119,7 @@ public class ElasticsearchWriterTest {
         // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
         esWriter.setDocumentWriter(docWriter);
-        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        esWriter.init(stormConf, writerConfiguration);
         BulkWriterResponse response = esWriter.write("bro", 
writerConfiguration, messages);
 
         // the writer response should only contain failures
@@ -151,7 +147,7 @@ public class ElasticsearchWriterTest {
         // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
         esWriter.setDocumentWriter(docWriter);
-        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        esWriter.init(stormConf, writerConfiguration);
         BulkWriterResponse response = esWriter.write("bro", 
writerConfiguration, messages);
 
         // the writer response should only contain failures
@@ -180,7 +176,7 @@ public class ElasticsearchWriterTest {
         // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
         esWriter.setDocumentWriter(docWriter);
-        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        esWriter.init(stormConf, writerConfiguration);
         BulkWriterResponse response = esWriter.write("bro", 
writerConfiguration, messages);
 
         // response should contain some successes and some failures
@@ -214,7 +210,7 @@ public class ElasticsearchWriterTest {
         // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
         esWriter.setDocumentWriter(docWriter);
-        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        esWriter.init(stormConf, writerConfiguration);
         BulkWriterResponse response = esWriter.write("bro", 
writerConfiguration, messages);
 
         // response should only contain successes
@@ -239,7 +235,7 @@ public class ElasticsearchWriterTest {
         // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
         esWriter.setDocumentWriter(docWriter);
-        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        esWriter.init(stormConf, writerConfiguration);
         BulkWriterResponse response = esWriter.write("bro", 
writerConfiguration, messages);
 
         // response should only contain successes
diff --git 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index fd5f874..671e6b8 100644
--- 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -17,7 +17,7 @@
  */
 package org.apache.metron.enrichment.bolt;
 
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
 import org.apache.storm.task.TopologyContext;
 import com.google.common.base.Joiner;
 import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
diff --git 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 55c7b66..f8a4223 100644
--- 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -28,18 +28,18 @@ import com.github.benmanes.caffeine.cache.CacheLoader;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.storm.common.bolt.ConfiguredEnrichmentBolt;
 import org.apache.metron.common.configuration.ConfigurationType;
 import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.performance.PerformanceLogger;
-import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.enrichment.cache.CacheKey;
 import org.apache.metron.enrichment.configuration.Enrichment;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.apache.metron.enrichment.utils.EnrichmentUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.metron.storm.common.utils.StormErrorUtils;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -214,7 +214,7 @@ public class GenericEnrichmentBolt extends 
ConfiguredEnrichmentBolt {
                       .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
                       .withMessage("Unable to find SensorEnrichmentConfig for 
sourceType: " + sourceType)
                       .addRawMessage(rawMessage);
-              ErrorUtils.handleError(collector, metronError);
+              StormErrorUtils.handleError(collector, metronError);
               continue;
             }
             config.getConfiguration().putIfAbsent(STELLAR_CONTEXT_CONF, 
stellarContext);
@@ -239,7 +239,7 @@ public class GenericEnrichmentBolt extends 
ConfiguredEnrichmentBolt {
                       .withThrowable(e)
                       .withErrorFields(new HashSet() {{ add(field); }})
                       .addRawMessage(rawMessage);
-              ErrorUtils.handleError(collector, metronError);
+              StormErrorUtils.handleError(collector, metronError);
               continue;
             }
           }
@@ -268,7 +268,7 @@ public class GenericEnrichmentBolt extends 
ConfiguredEnrichmentBolt {
             .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
             .withThrowable(e)
             .addRawMessage(rawMessage);
-    ErrorUtils.handleError(collector, error);
+    StormErrorUtils.handleError(collector, error);
   }
 
   @Override
diff --git 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
index a9263fb..ac6a1cf 100644
--- 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
+++ 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
@@ -30,12 +30,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.storm.common.bolt.ConfiguredEnrichmentBolt;
 import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
-import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetters;
 import org.apache.metron.common.performance.PerformanceLogger;
-import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.storm.common.utils.StormErrorUtils;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -169,7 +169,7 @@ public abstract class JoinBolt<V> extends 
ConfiguredEnrichmentBolt {
               .withMessage("Joining problem: " + message)
               .withThrowable(e)
               .addRawMessage(message);
-      ErrorUtils.handleError(collector, error);
+      StormErrorUtils.handleError(collector, error);
       collector.ack(tuple);
     }
     perfLog.log("execute", "key={}, elapsed time to run execute", key);
diff --git 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
index de69ad4..48cec0b 100644
--- 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
+++ 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
@@ -20,7 +20,7 @@ package org.apache.metron.enrichment.bolt;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.storm.common.bolt.ConfiguredEnrichmentBolt;
 import org.apache.metron.common.performance.PerformanceLogger;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
diff --git 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
index 00c23ff..2b19375 100644
--- 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++ 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import org.apache.metron.common.configuration.ConfigurationType;
 import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
 import org.apache.metron.common.utils.MessageUtils;
 import org.apache.metron.enrichment.adapters.maxmind.asn.GeoLiteAsnDatabase;
 import org.apache.metron.enrichment.adapters.maxmind.geo.GeoLiteCityDatabase;
diff --git 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
index d9f57eb..dd49de2 100644
--- 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
+++ 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
@@ -20,14 +20,13 @@ package org.apache.metron.enrichment.bolt;
 import static org.apache.metron.common.Constants.STELLAR_CONTEXT_CONF;
 
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.storm.common.bolt.ConfiguredEnrichmentBolt;
 import org.apache.metron.common.configuration.ConfigurationType;
 import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
-import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetters;
 import org.apache.metron.common.performance.PerformanceLogger;
-import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.common.utils.MessageUtils;
 import org.apache.metron.enrichment.adapters.maxmind.asn.GeoLiteAsnDatabase;
 import org.apache.metron.enrichment.adapters.maxmind.geo.GeoLiteCityDatabase;
@@ -41,6 +40,7 @@ import 
org.apache.metron.enrichment.parallel.ConcurrencyContext;
 import org.apache.metron.enrichment.parallel.WorkerPoolStrategies;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.metron.storm.common.utils.StormErrorUtils;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -285,7 +285,7 @@ public class UnifiedEnrichmentBolt extends 
ConfiguredEnrichmentBolt {
                 .withMessage(t.getValue().getMessage())
                 .withThrowable(t.getValue())
                 .addRawMessage(t.getKey());
-        ErrorUtils.handleError(collector, error);
+        StormErrorUtils.handleError(collector, error);
       }
     } catch (Exception e) {
       //If something terrible and unexpected happens then we want to send an 
error along, but this
@@ -296,7 +296,7 @@ public class UnifiedEnrichmentBolt extends 
ConfiguredEnrichmentBolt {
               .withMessage(e.getMessage())
               .withThrowable(e)
               .addRawMessage(message);
-      ErrorUtils.handleError(collector, error);
+      StormErrorUtils.handleError(collector, error);
     }
     finally {
       collector.ack(input);
diff --git 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
index 9fc8947..ed623f3 100644
--- 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
+++ 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
 import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
diff --git 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
index bc7dace..8bdf409 100644
--- 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
+++ 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
@@ -21,7 +21,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
 import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
 import org.apache.metron.test.error.MetronErrorJSONMatcher;
 import org.apache.storm.task.TopologyContext;
diff --git 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
index a05c843..aeafede 100644
--- 
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
+++ 
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
@@ -31,7 +31,7 @@ import java.util.Map;
 import org.adrianwalker.multilinestring.Multiline;
 import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import 
org.apache.metron.common.configuration.enrichment.threatintel.ThreatTriageConfig;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.enrichment.adapters.maxmind.asn.GeoLiteAsnDatabase;
 import org.apache.metron.enrichment.adapters.maxmind.geo.GeoLiteCityDatabase;
diff --git a/metron-platform/metron-parsing/metron-parsing-storm/pom.xml 
b/metron-platform/metron-parsing/metron-parsing-storm/pom.xml
index 72770c8..c936ef9 100644
--- a/metron-platform/metron-parsing/metron-parsing-storm/pom.xml
+++ b/metron-platform/metron-parsing/metron-parsing-storm/pom.xml
@@ -33,7 +33,7 @@
     <!-- Metron dependencies -->
     <dependency>
       <groupId>org.apache.metron</groupId>
-      <artifactId>metron-common</artifactId>
+      <artifactId>metron-common-storm</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
     <dependency>
diff --git 
a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
 
b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 21aa087..2837fcb 100644
--- 
a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ 
b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -30,18 +30,18 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.bolt.ConfiguredParserBolt;
+import org.apache.metron.storm.common.bolt.ConfiguredParserBolt;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
-import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetters;
 import org.apache.metron.common.message.metadata.RawMessage;
-import org.apache.metron.common.message.metadata.RawMessageUtil;
-import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.storm.common.message.metadata.RawMessageUtil;
 import org.apache.metron.common.utils.MessageUtils;
 import org.apache.metron.common.writer.BulkMessage;
+import org.apache.metron.storm.common.utils.StormErrorUtils;
 import org.apache.metron.writer.AckTuplesPolicy;
 import org.apache.metron.parsers.ParserRunner;
 import org.apache.metron.parsers.ParserRunnerResults;
@@ -255,7 +255,7 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
               , sensorParserConfig.getRawMessageStrategyConfig()
       );
       ParserRunnerResults<JSONObject> parserRunnerResults = 
parserRunner.execute(sensorType, rawMessage, parserConfigurations);
-      parserRunnerResults.getErrors().forEach(error -> 
ErrorUtils.handleError(collector, error));
+      parserRunnerResults.getErrors().forEach(error -> 
StormErrorUtils.handleError(collector, error));
 
       WriterHandler writer = sensorToWriterMap.get(sensorType);
       int numWritten = 0;
@@ -326,7 +326,7 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
             .withThrowable(ex)
             .withSensorType(Collections.singleton(sensorType))
             .addRawMessage(originalMessage);
-    ErrorUtils.handleError(collector, error);
+    StormErrorUtils.handleError(collector, error);
   }
 
   @Override
diff --git 
a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
 
b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
index 6a174f3..1ee6464 100644
--- 
a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
+++ 
b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
@@ -21,11 +21,11 @@ package org.apache.metron.parsers.bolt;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
-import org.apache.metron.common.message.MessageGetters;
-import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetters;
 import org.apache.metron.common.utils.MessageUtils;
 import org.apache.metron.common.writer.BulkMessage;
+import org.apache.metron.storm.common.utils.StormErrorUtils;
 import org.apache.metron.writer.AckTuplesPolicy;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -94,7 +94,7 @@ public class WriterBolt extends BaseRichBolt {
               .withThrowable(e)
               .withSensorType(Collections.singleton(sensorType))
               .addRawMessage(message);
-      ErrorUtils.handleError(collector, error);
+      StormErrorUtils.handleError(collector, error);
       collector.ack(tuple);
     }
   }
diff --git 
a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
 
b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
index 434db45..4f5ebe9 100644
--- 
a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
+++ 
b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
@@ -27,7 +27,7 @@ import 
org.apache.metron.common.configuration.writer.ConfigurationStrategy;
 import org.apache.metron.common.configuration.writer.ConfigurationsStrategies;
 import 
org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.MessageWriter;
@@ -75,7 +75,7 @@ public class WriterHandler implements Serializable {
       writerTransformer = config -> new 
SingleBatchConfigurationFacade(configStrategy.createWriterConfig(messageWriter, 
config));
     }
     try {
-      messageWriter.init(stormConf, topologyContext, 
writerTransformer.apply(configurations));
+      messageWriter.init(stormConf, writerTransformer.apply(configurations));
     } catch (Exception e) {
       throw new IllegalStateException("Unable to initialize message writer", 
e);
     }
diff --git 
a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
 
b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 90c882f..4240f7a 100644
--- 
a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ 
b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -22,7 +22,7 @@ import 
org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
 import org.apache.metron.common.message.metadata.RawMessage;
 import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.writer.AckTuplesPolicy;
diff --git 
a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
 
b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
index 68fc15f..e16c5e9 100644
--- 
a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
+++ 
b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
@@ -106,7 +106,7 @@ public class WriterBoltTest extends BaseBoltTest{
     }
 
     bolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(batchWriter, times(1)).init(any(), any(), any());
+    verify(batchWriter, times(1)).init(any(), any());
     for(int i = 0;i < 4;++i) {
       Tuple t = tuples.get(i);
       bolt.execute(t);
@@ -206,7 +206,7 @@ public class WriterBoltTest extends BaseBoltTest{
 
 
     bolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(batchWriter, times(1)).init(any(), any(), any());
+    verify(batchWriter, times(1)).init(any(), any());
 
     for(int i = 0;i < 4;++i) {
       Tuple t = tuples.get(i);
@@ -264,7 +264,7 @@ public class WriterBoltTest extends BaseBoltTest{
     when(errorTuple.getValueByField(eq("message"))).thenReturn(errorMessage);
 
     bolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(batchWriter, times(1)).init(any(), any(), any());
+    verify(batchWriter, times(1)).init(any(), any());
 
     for(int i = 0;i < 4;++i) {
       Tuple t = tuples.get(i);
@@ -316,7 +316,7 @@ public class WriterBoltTest extends BaseBoltTest{
 
     bolt.prepare(new HashMap(), topologyContext, outputCollector);
     doThrow(new Exception()).when(batchWriter).write(any(), any(), any());
-    verify(batchWriter, times(1)).init(any(), any(), any());
+    verify(batchWriter, times(1)).init(any(), any());
     for(int i = 0;i < 4;++i) {
       Tuple t = tuples.get(i);
       bolt.execute(t);
diff --git 
a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
 
b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
index 66b6c2b..0d46d73 100644
--- 
a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
+++ 
b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
@@ -60,7 +60,7 @@ public class StormParserDriver extends ParserDriver {
     }
 
     @Override
-    public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration config) throws Exception {
+    public void init(Map stormConf, WriterConfiguration config) throws 
Exception {
 
     }
 
diff --git 
a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
index 6fb2b35..b23a517 100644
--- 
a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
+++ 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
@@ -50,7 +50,6 @@ import 
org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
 import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
-import org.apache.storm.task.TopologyContext;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -153,7 +152,7 @@ public class SolrWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
   }
 
   @Override
-  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration configurations) throws IOException, SolrServerException {
+  public void init(Map stormConf, WriterConfiguration configurations) throws 
IOException, SolrServerException {
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
     initializeFromGlobalConfig(globalConfiguration);
     LOG.info("Initializing SOLR writer: {}", zookeeperUrl);
diff --git 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
index a4908ad..ddc7801 100644
--- 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
+++ 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
@@ -157,7 +157,7 @@ public class SchemaValidationIntegrationTest {
         }
       };
 
-      solrWriter.init(null, null, writerConfig);
+      solrWriter.init(null, writerConfig);
 
       BulkWriterResponse response = solrWriter.write(sensorType, writerConfig, 
messages);
       Assert.assertTrue(response.getErrors().isEmpty());
diff --git 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
index 0f8dab1..515f2f8 100644
--- 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
+++ 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
@@ -121,7 +121,7 @@ public class SolrWriterTest {
     String collection = "metron";
     MetronSolrClient solr = Mockito.mock(MetronSolrClient.class);
     SolrWriter writer = new SolrWriter().withMetronSolrClient(solr);
-    writer.init(null, null,new IndexingWriterConfiguration("solr", 
configurations));
+    writer.init(null,new IndexingWriterConfiguration("solr", configurations));
     verify(solr, times(1)).setDefaultCollection(collection);
 
     collection = "metron2";
@@ -129,7 +129,7 @@ public class SolrWriterTest {
     globalConfig.put("solr.collection", collection);
     configurations.updateGlobalConfig(globalConfig);
     writer = new SolrWriter().withMetronSolrClient(solr);
-    writer.init(null, null, new IndexingWriterConfiguration("solr", 
configurations));
+    writer.init(null, new IndexingWriterConfiguration("solr", configurations));
     verify(solr, times(1)).setDefaultCollection(collection);
 
     writer.write("test", new IndexingWriterConfiguration("solr", 
configurations), messages);
diff --git a/metron-platform/metron-writer/pom.xml 
b/metron-platform/metron-writer/pom.xml
index 2376bff..2205cec 100644
--- a/metron-platform/metron-writer/pom.xml
+++ b/metron-platform/metron-writer/pom.xml
@@ -219,7 +219,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
-            <artifactId>metron-common</artifactId>
+            <artifactId>metron-common-storm</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
          <dependency>
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AckTuplesPolicy.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AckTuplesPolicy.java
index 64685bf..2543309 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AckTuplesPolicy.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AckTuplesPolicy.java
@@ -20,7 +20,7 @@ package org.apache.metron.writer;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
 import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.BulkWriterResponse;
 import org.apache.metron.common.writer.MessageId;
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
index 01def5d..c81076a 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
@@ -19,7 +19,6 @@ package org.apache.metron.writer;
 
 import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.MessageId;
-import org.apache.storm.task.TopologyContext;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
@@ -130,7 +129,7 @@ public class NoopWriter extends AbstractWriter implements 
BulkMessageWriter<JSON
   }
 
   @Override
-  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration config) throws Exception {
+  public void init(Map stormConf, WriterConfiguration config) throws Exception 
{
   }
 
   @Override
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
index 709875a..6b8bb2f 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
@@ -20,7 +20,6 @@ package org.apache.metron.writer;
 
 import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.MessageId;
-import org.apache.storm.task.TopologyContext;
 import com.google.common.collect.Iterables;
 import 
org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
@@ -44,7 +43,7 @@ public class WriterToBulkWriter<MESSAGE_T> implements 
BulkMessageWriter<MESSAGE_
     this.messageWriter = messageWriter;
   }
   @Override
-  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration config) throws Exception {
+  public void init(Map stormConf, WriterConfiguration config) throws Exception 
{
     messageWriter.init();
   }
 
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index c9215e3..8d38c60 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -29,21 +29,22 @@ import java.util.function.Function;
 
 import com.google.common.collect.Iterables;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.bolt.ConfiguredBolt;
+import org.apache.metron.storm.common.bolt.ConfiguredBolt;
 import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
-import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetters;
 import org.apache.metron.common.system.Clock;
-import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.common.utils.MessageUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.MessageWriter;
+import org.apache.metron.storm.common.utils.StormErrorUtils;
 import org.apache.metron.writer.AckTuplesPolicy;
 import org.apache.metron.writer.BulkWriterComponent;
 import org.apache.metron.writer.WriterToBulkWriter;
+import org.apache.metron.writer.hdfs.HdfsWriter;
 import org.apache.storm.Config;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -235,7 +236,10 @@ public class BulkMessageWriterBolt<CONFIG_T extends 
Configurations> extends Conf
       BulkWriterComponent<JSONObject> bulkWriterComponent = new 
BulkWriterComponent<>(maxBatchTimeout);
       bulkWriterComponent.addFlushPolicy(ackTuplesPolicy);
       setWriterComponent(bulkWriterComponent);
-      bulkMessageWriter.init(stormConf, context, writerconf);
+      bulkMessageWriter.init(stormConf, writerconf);
+      if (bulkMessageWriter instanceof HdfsWriter) {
+        ((HdfsWriter) bulkMessageWriter).initFileNameFormat(context);
+      }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -362,7 +366,7 @@ public class BulkMessageWriterBolt<CONFIG_T extends 
Configurations> extends Conf
             .withErrorType(Constants.ErrorType.INDEXING_ERROR)
             .withThrowable(e);
     collector.ack(tuple);
-    ErrorUtils.handleError(collector, error);
+    StormErrorUtils.handleError(collector, error);
   }
 
   @Override
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java
index 50b11e1..d7b2cb9 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java
@@ -20,7 +20,6 @@ package org.apache.metron.writer.hbase;
 
 import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.MessageId;
-import org.apache.storm.task.TopologyContext;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.apache.hadoop.conf.Configuration;
@@ -178,7 +177,7 @@ public class SimpleHbaseEnrichmentWriter extends 
AbstractWriter implements BulkM
   }
 
   @Override
-  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration configuration) throws Exception {
+  public void init(Map stormConf, WriterConfiguration configuration) throws 
Exception {
     if(converter == null) {
       converter = new EnrichmentConverter();
     }
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index 9e6827b..d251602 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -88,10 +88,9 @@ public class HdfsWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
   }
 
   @Override
-  public void init(Map stormConfig, TopologyContext topologyContext, 
WriterConfiguration configurations) {
+  public void init(Map stormConfig, WriterConfiguration configurations) {
     this.stormConfig = stormConfig;
     this.stellarProcessor = new StellarProcessor();
-    this.fileNameFormat.prepare(stormConfig,topologyContext);
     if(syncPolicy != null) {
       //if the user has specified the sync policy, we don't want to override 
their wishes.
       LOG.debug("Using user specified sync policy {}", 
syncPolicy.getClass().getSimpleName());
@@ -104,6 +103,10 @@ public class HdfsWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
     }
   }
 
+  public void initFileNameFormat(TopologyContext topologyContext) {
+    this.fileNameFormat.prepare(stormConfig,topologyContext);
+  }
+
   @Override
   public BulkWriterResponse write(String sensorType, WriterConfiguration 
configurations, List<BulkMessage<JSONObject>> messages) throws Exception {
     BulkWriterResponse response = new BulkWriterResponse();
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
index 78a27fd..a313057 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
@@ -44,7 +44,6 @@ import org.apache.metron.common.writer.BulkWriterResponse;
 import org.apache.metron.common.writer.MessageId;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.writer.AbstractWriter;
-import org.apache.storm.task.TopologyContext;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -200,7 +199,7 @@ public class KafkaWriter extends AbstractWriter implements 
BulkMessageWriter<JSO
   }
 
   @Override
-  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration config)
+  public void init(Map stormConf, WriterConfiguration config)
       throws Exception {
     if(this.zkQuorum != null && this.brokerUrl == null) {
       try {
diff --git 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/AckTuplesPolicyTest.java
 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/AckTuplesPolicyTest.java
index 1d970cb..04efb3d 100644
--- 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/AckTuplesPolicyTest.java
+++ 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/AckTuplesPolicyTest.java
@@ -19,7 +19,7 @@ package org.apache.metron.writer;
 
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
 import org.apache.metron.common.writer.BulkWriterResponse;
 import org.apache.metron.common.writer.MessageId;
 import org.apache.metron.test.error.MetronErrorJSONMatcher;
diff --git 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
index 16f3b4f..e94608f 100644
--- 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
+++ 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
@@ -18,7 +18,6 @@
 package org.apache.metron.writer;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
diff --git 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BulkMessageWriterBoltTest.java
 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BulkMessageWriterBoltTest.java
index df80296..0fd784f 100644
--- 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BulkMessageWriterBoltTest.java
+++ 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BulkMessageWriterBoltTest.java
@@ -42,7 +42,7 @@ import org.apache.log4j.Level;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.storm.common.message.MessageGetters;
 import org.apache.metron.common.system.FakeClock;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkMessage;
@@ -51,7 +51,6 @@ import org.apache.metron.common.writer.MessageId;
 import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.metron.writer.BulkWriterComponent;
-import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 import org.json.simple.JSONObject;
@@ -163,7 +162,7 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
     
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
             new FileInputStream(sampleSensorIndexingConfigPath));
     {
-      doThrow(new 
Exception()).when(bulkMessageWriter).init(eq(stormConf),any(TopologyContext.class),
 any(WriterConfiguration.class));
+      doThrow(new Exception()).when(bulkMessageWriter).init(eq(stormConf), 
any(WriterConfiguration.class));
       try {
         bulkMessageWriterBolt.prepare(stormConf, topologyContext, 
outputCollector);
         fail("A runtime exception should be thrown when bulkMessageWriter.init 
throws an exception");
@@ -173,7 +172,7 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
     {
       when(bulkMessageWriter.getName()).thenReturn("hdfs");
       bulkMessageWriterBolt.prepare(stormConf, topologyContext, 
outputCollector);
-      verify(bulkMessageWriter, 
times(1)).init(eq(stormConf),any(TopologyContext.class), 
any(WriterConfiguration.class));
+      verify(bulkMessageWriter, times(1)).init(eq(stormConf), 
any(WriterConfiguration.class));
     }
     {
       for(int i = 0; i < 4; i++) {
@@ -222,7 +221,7 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
       Map stormConf = new HashMap();
       when(bulkMessageWriter.getName()).thenReturn("elasticsearch");
       bulkMessageWriterBolt.prepare(stormConf, topologyContext, 
outputCollector, clock);
-      verify(bulkMessageWriter, 
times(1)).init(eq(stormConf),any(TopologyContext.class), 
any(WriterConfiguration.class));
+      verify(bulkMessageWriter, times(1)).init(eq(stormConf), 
any(WriterConfiguration.class));
     }
     {
       int batchTimeout = bulkMessageWriterBolt.getMaxBatchTimeout();
@@ -260,8 +259,7 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
       Map stormConf = new HashMap();
       when(bulkMessageWriter.getName()).thenReturn("elasticsearch");
       bulkMessageWriterBolt.prepare(stormConf, topologyContext, 
outputCollector, clock);
-      verify(bulkMessageWriter, 
times(1)).init(eq(stormConf),any(TopologyContext.class)
-              , any(WriterConfiguration.class));
+      verify(bulkMessageWriter, times(1)).init(eq(stormConf), 
any(WriterConfiguration.class));
     }
     {
       int batchTimeout = bulkMessageWriterBolt.getMaxBatchTimeout();
diff --git 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
index 88ba4fd..e48c8e2 100644
--- 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
+++ 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
@@ -76,7 +76,8 @@ public class HdfsWriterTest {
   public void testGetHdfsPathNull() {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(),createTopologyContext(), config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     JSONObject message = new JSONObject();
     Object result = writer.getHdfsPathExtension(SENSOR_NAME,null, message);
@@ -89,7 +90,8 @@ public class HdfsWriterTest {
   public void testGetHdfsPathEmptyString() {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     JSONObject message = new JSONObject();
     Object result = writer.getHdfsPathExtension(SENSOR_NAME, "", message);
@@ -102,7 +104,8 @@ public class HdfsWriterTest {
   public void testGetHdfsPathConstant() {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     JSONObject message = new JSONObject();
     Object result = writer.getHdfsPathExtension(SENSOR_NAME, "'new'", message);
@@ -115,7 +118,8 @@ public class HdfsWriterTest {
   public void testGetHdfsPathDirectVariable() {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     JSONObject message = new JSONObject();
     message.put("test.key", "test.value");
@@ -129,7 +133,8 @@ public class HdfsWriterTest {
   public void testGetHdfsPathFormatConstant() {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     JSONObject message = new JSONObject();
     Object result = writer.getHdfsPathExtension(SENSOR_NAME, 
"FORMAT('/test/folder/')", message);
@@ -143,7 +148,8 @@ public class HdfsWriterTest {
     IndexingConfigurations indexingConfig = new IndexingConfigurations();
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     JSONObject message = new JSONObject();
     message.put("test.key", "test.value");
@@ -159,7 +165,8 @@ public class HdfsWriterTest {
     IndexingConfigurations indexingConfig = new IndexingConfigurations();
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
     String filename = writer.fileNameFormat.getName(1,1);
     Assert.assertEquals("prefix-Xcom-7-1-1.json", filename);
     writer.close();
@@ -171,7 +178,8 @@ public class HdfsWriterTest {
     IndexingConfigurations indexingConfig = new IndexingConfigurations();
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     JSONObject message = new JSONObject();
     message.put("test.key", "test.value");
@@ -193,7 +201,8 @@ public class HdfsWriterTest {
     IndexingConfigurations indexingConfig = new IndexingConfigurations();
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), createTopologyContext(),  
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     JSONObject message = new JSONObject();
     message.put("test.key", "test.value");
@@ -206,7 +215,8 @@ public class HdfsWriterTest {
   public void testGetHdfsPathNonString() {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), createTopologyContext(),  
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     JSONObject message = new JSONObject();
     writer.getHdfsPathExtension(SENSOR_NAME, "{'key':'value'}", message);
@@ -220,7 +230,8 @@ public class HdfsWriterTest {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat)
             .withMaxOpenFiles(maxFiles);
-    writer.init(new HashMap<String, String>(), createTopologyContext(),  
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     for(int i = 0; i < maxFiles; i++) {
       writer.getSourceHandler(SENSOR_NAME, Integer.toString(i), null);
@@ -235,7 +246,8 @@ public class HdfsWriterTest {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat)
                                         .withMaxOpenFiles(maxFiles);
-    writer.init(new HashMap<String, String>(), createTopologyContext(),  
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     for(int i = 0; i < maxFiles+1; i++) {
       writer.getSourceHandler(SENSOR_NAME, Integer.toString(i), null);
@@ -253,7 +265,8 @@ public class HdfsWriterTest {
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
     IndexingConfigurations indexingConfig = new IndexingConfigurations();
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
-    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     JSONObject message = new JSONObject();
     message.put("test.key", "test.value");
@@ -296,7 +309,8 @@ public class HdfsWriterTest {
             .withExtension(".json")
             .withPrefix("prefix-");
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
-    writer.init(new HashMap<String, String>(), createTopologyContext(),  
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     // These two messages will be routed to the same folder, because test.key 
is the same
     JSONObject message = new JSONObject();
@@ -339,7 +353,8 @@ public class HdfsWriterTest {
             .withExtension(".json")
             .withPrefix("prefix-");
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
-    writer.init(new HashMap<String, String>(), createTopologyContext(),  
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     // These two messages will be routed to the same folder, because test.key 
is the same
     JSONObject message = new JSONObject();
@@ -395,7 +410,8 @@ public class HdfsWriterTest {
             .withExtension(".json")
             .withPrefix("prefix-");
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
-    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     // These two messages will be routed to the same folder, because test.key 
is the same
     JSONObject message = new JSONObject();
@@ -428,7 +444,8 @@ public class HdfsWriterTest {
     String function = "FORMAT('test-%s/%s', test.key, test.key)";
     WriterConfiguration config = buildWriterConfiguration(function);
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
+    writer.init(new HashMap<String, String>(), config);
+    writer.initFileNameFormat(createTopologyContext());
 
     JSONObject message = new JSONObject();
     message.put("test.key", "test.value");
diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml
index af563cf..5fd4f29 100644
--- a/metron-platform/pom.xml
+++ b/metron-platform/pom.xml
@@ -62,6 +62,7 @@
                <module>metron-zookeeper</module>
     <module>metron-parsing</module>
     <module>metron-hbase-server</module>
+    <module>metron-common-streaming</module>
   </modules>
        <dependencies>
                <dependency>

Reply via email to