http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
index c92feab..1f300ea 100644
--- 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
+++ 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
@@ -17,9 +17,27 @@
  */
 package org.apache.metron.rest.service.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
 import kafka.admin.AdminOperationException;
 import kafka.admin.AdminUtils$;
 import kafka.admin.RackAwareMode;
@@ -27,6 +45,8 @@ import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -43,25 +63,6 @@ import 
org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.springframework.kafka.core.ConsumerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
 @SuppressWarnings("unchecked")
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*") // resolve classloader conflict
@@ -72,6 +73,7 @@ public class KafkaServiceImplTest {
 
   private ZkUtils zkUtils;
   private KafkaConsumer<String, String> kafkaConsumer;
+  private KafkaProducer<String, String> kafkaProducer;
   private ConsumerFactory<String, String> kafkaConsumerFactory;
   private AdminUtils$ adminUtils;
 
@@ -90,11 +92,12 @@ public class KafkaServiceImplTest {
     zkUtils = mock(ZkUtils.class);
     kafkaConsumerFactory = mock(ConsumerFactory.class);
     kafkaConsumer = mock(KafkaConsumer.class);
+    kafkaProducer = mock(KafkaProducer.class);
     adminUtils = mock(AdminUtils$.class);
 
     when(kafkaConsumerFactory.createConsumer()).thenReturn(kafkaConsumer);
 
-    kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumerFactory, 
adminUtils);
+    kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumerFactory, 
kafkaProducer, adminUtils);
   }
 
   @Test
@@ -304,4 +307,16 @@ public class KafkaServiceImplTest {
 
     verifyZeroInteractions(zkUtils, adminUtils);
   }
+
+  @Test
+  public void produceMessageShouldProperlyProduceMessage() throws Exception {
+    final String topicName = "t";
+    final String message = "{\"field\":\"value\"}";
+
+    kafkaService.produceMessage(topicName, message);
+
+    String expectedMessage = "{\"field\":\"value\"}";
+    verify(kafkaProducer).send(new ProducerRecord<>(topicName, 
expectedMessage));
+    verifyZeroInteractions(kafkaProducer);
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
index 1ded45e..cfc24f4 100644
--- 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
+++ 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
@@ -53,6 +53,8 @@ import java.io.FileWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -99,6 +101,8 @@ public class SensorParserConfigServiceImplTest {
   @Multiline
   public static String jsonMapJson;
 
+  private String user = "user1";
+
   @Before
   public void setUp() throws Exception {
     BundleSystem.reset();
@@ -106,7 +110,10 @@ public class SensorParserConfigServiceImplTest {
     curatorFramework = mock(CuratorFramework.class);
     grokService = mock(GrokService.class);
     environment = mock(Environment.class);
+    Authentication authentication = mock(Authentication.class);
+    when(authentication.getName()).thenReturn(user);
     
when(environment.getProperty(MetronRestConstants.HDFS_METRON_APPS_ROOT)).thenReturn("./target");
+    SecurityContextHolder.getContext().setAuthentication(authentication);
     try(FileInputStream fis = new FileInputStream(new 
File("src/test/resources/zookeeper/bundle.properties"))) {
       BundleProperties properties = 
BundleProperties.createBasicBundleProperties(fis, new HashMap<>());
       
properties.setProperty(BundleProperties.BUNDLE_LIBRARY_DIRECTORY,"./target");
@@ -307,8 +314,6 @@ public class SensorParserConfigServiceImplTest {
     writer.write(grokStatement);
     writer.close();
 
-    when(grokService.saveTemporary(grokStatement, 
"squid")).thenReturn(patternFile);
-
     assertEquals(new HashMap() {{
       put("elapsed", 161);
       put("code", 200);

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/elasticsearch-shaded/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/elasticsearch-shaded/pom.xml 
b/metron-platform/elasticsearch-shaded/pom.xml
index bf02510..bbf96a0 100644
--- a/metron-platform/elasticsearch-shaded/pom.xml
+++ b/metron-platform/elasticsearch-shaded/pom.xml
@@ -89,6 +89,16 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
+                          <filters>
+                            <filter>
+                              <artifact>*:*</artifact>
+                              <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                              </excludes>
+                            </filter>
+                          </filters>  
                             <relocations>
                                 <relocation>
                                     <pattern>com.google.common</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-api/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/pom.xml 
b/metron-platform/metron-api/pom.xml
index 912859d..8a15251 100644
--- a/metron-platform/metron-api/pom.xml
+++ b/metron-platform/metron-api/pom.xml
@@ -221,6 +221,16 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
+                          <filters>
+                            <filter>
+                              <artifact>*:*</artifact>
+                              <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                              </excludes>
+                            </filter>
+                          </filters>
                             <relocations>
                                 <relocation>
                                     <pattern>com.google.common</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml 
b/metron-platform/metron-common/pom.xml
index 390ec23..9356e13 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -403,6 +403,16 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
+                          <filters>
+                            <filter>
+                              <artifact>*:*</artifact>
+                              <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                              </excludes>
+                            </filter>
+                          </filters>
                             <relocations>
                                 <relocation>
                                     <pattern>com.google.common</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index f08e9c4..2d0ccd8 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -37,7 +37,155 @@ public class SensorParserConfig implements Serializable {
   private String invalidWriterClassName;
   private Boolean readMetadata = false;
   private Boolean mergeMetadata = false;
+  private Integer numWorkers = null;
+  private Integer numAckers= null;
+  private Integer spoutParallelism = 1;
+  private Integer spoutNumTasks = 1;
+  private Integer parserParallelism = 1;
+  private Integer parserNumTasks = 1;
+  private Integer errorWriterParallelism = 1;
+  private Integer errorWriterNumTasks = 1;
+  private Map<String, Object> spoutConfig = new HashMap<>();
+  private String securityProtocol = null;
+  private Map<String, Object> stormConfig = new HashMap<>();
+
+  /**
+   * Return the number of workers for the topology.  This property will be 
used for the parser unless overridden on the CLI.
+   * @return
+   */
+  public Integer getNumWorkers() {
+    return numWorkers;
+  }
+
+  public void setNumWorkers(Integer numWorkers) {
+    this.numWorkers = numWorkers;
+  }
+
+  /**
+   * Return the number of ackers for the topology.  This property will be used 
for the parser unless overridden on the CLI.
+   * @return
+   */
+  public Integer getNumAckers() {
+    return numAckers;
+  }
+
+  public void setNumAckers(Integer numAckers) {
+    this.numAckers = numAckers;
+  }
+
+  /**
+   * Return the spout parallelism.  This property will be used for the parser 
unless overridden on the CLI.
+   * @return
+   */
+  public Integer getSpoutParallelism() {
+    return spoutParallelism;
+  }
+
+  public void setSpoutParallelism(Integer spoutParallelism) {
+    this.spoutParallelism = spoutParallelism;
+  }
+
+  /**
+   * Return the spout num tasks.  This property will be used for the parser 
unless overridden on the CLI.
+   * @return
+   */
+  public Integer getSpoutNumTasks() {
+    return spoutNumTasks;
+  }
+
+  public void setSpoutNumTasks(Integer spoutNumTasks) {
+    this.spoutNumTasks = spoutNumTasks;
+  }
+
+  /**
+   * Return the parser parallelism.  This property will be used for the parser 
unless overridden on the CLI.
+   * @return
+   */
+  public Integer getParserParallelism() {
+    return parserParallelism;
+  }
+
+  public void setParserParallelism(Integer parserParallelism) {
+    this.parserParallelism = parserParallelism;
+  }
+
+  /**
+   * Return the parser number of tasks.  This property will be used for the 
parser unless overridden on the CLI.
+   * @return
+   */
+  public Integer getParserNumTasks() {
+    return parserNumTasks;
+  }
+
+  public void setParserNumTasks(Integer parserNumTasks) {
+    this.parserNumTasks = parserNumTasks;
+  }
+
+  /**
+   * Return the error writer bolt parallelism.  This property will be used for 
the parser unless overridden on the CLI.
+   * @return
+   */
+  public Integer getErrorWriterParallelism() {
+    return errorWriterParallelism;
+  }
+
+  public void setErrorWriterParallelism(Integer errorWriterParallelism) {
+    this.errorWriterParallelism = errorWriterParallelism;
+  }
+
+  /**
+   * Return the error writer bolt number of tasks.  This property will be used 
for the parser unless overridden on the CLI.
+   * @return
+   */
+  public Integer getErrorWriterNumTasks() {
+    return errorWriterNumTasks;
+  }
+
+  public void setErrorWriterNumTasks(Integer errorWriterNumTasks) {
+    this.errorWriterNumTasks = errorWriterNumTasks;
+  }
+
+  /**
+   * Return the spout config.  This includes kafka properties.  This property 
will be used for the parser unless overridden on the CLI.
+   * @return
+   */
+  public Map<String, Object> getSpoutConfig() {
+    return spoutConfig;
+  }
 
+  public void setSpoutConfig(Map<String, Object> spoutConfig) {
+    this.spoutConfig = spoutConfig;
+  }
+
+  /**
+   * Return security protocol to use.  This property will be used for the 
parser unless overridden on the CLI.
+   * The order of precedence is CLI > spout config > config in the sensor 
parser config.
+   * @return
+   */
+  public String getSecurityProtocol() {
+    return securityProtocol;
+  }
+
+  public void setSecurityProtocol(String securityProtocol) {
+    this.securityProtocol = securityProtocol;
+  }
+
+  /**
+   * Return Storm topologyconfig.  This property will be used for the parser 
unless overridden on the CLI.
+   * @return
+   */
+  public Map<String, Object> getStormConfig() {
+    return stormConfig;
+  }
+
+  public void setStormConfig(Map<String, Object> stormConfig) {
+    this.stormConfig = stormConfig;
+  }
+
+  /**
+   * Return whether or not to merge metadata sent into the message.  If true, 
then metadata become proper fields.
+   * @return
+   */
   public Boolean getMergeMetadata() {
     return mergeMetadata;
   }
@@ -46,6 +194,10 @@ public class SensorParserConfig implements Serializable {
     this.mergeMetadata = mergeMetadata;
   }
 
+  /**
+   * Return whether or not to read metadata at all.
+   * @return
+   */
   public Boolean getReadMetadata() {
     return readMetadata;
   }
@@ -145,10 +297,21 @@ public class SensorParserConfig implements Serializable {
             ", writerClassName='" + writerClassName + '\'' +
             ", errorWriterClassName='" + errorWriterClassName + '\'' +
             ", invalidWriterClassName='" + invalidWriterClassName + '\'' +
-            ", parserConfig=" + parserConfig +
-            ", fieldTransformations=" + fieldTransformations +
             ", readMetadata=" + readMetadata +
             ", mergeMetadata=" + mergeMetadata +
+            ", numWorkers=" + numWorkers +
+            ", numAckers=" + numAckers +
+            ", spoutParallelism=" + spoutParallelism +
+            ", spoutNumTasks=" + spoutNumTasks +
+            ", parserParallelism=" + parserParallelism +
+            ", parserNumTasks=" + parserNumTasks +
+            ", errorWriterParallelism=" + errorWriterParallelism +
+            ", errorWriterNumTasks=" + errorWriterNumTasks +
+            ", spoutConfig=" + spoutConfig +
+            ", securityProtocol='" + securityProtocol + '\'' +
+            ", stormConfig=" + stormConfig +
+            ", parserConfig=" + parserConfig +
+            ", fieldTransformations=" + fieldTransformations +
             '}';
   }
 
@@ -171,12 +334,34 @@ public class SensorParserConfig implements Serializable {
       return false;
     if (getInvalidWriterClassName() != null ? 
!getInvalidWriterClassName().equals(that.getInvalidWriterClassName()) : 
that.getInvalidWriterClassName() != null)
       return false;
-    if (getParserConfig() != null ? 
!getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != 
null)
-      return false;
     if (getReadMetadata() != null ? 
!getReadMetadata().equals(that.getReadMetadata()) : that.getReadMetadata() != 
null)
       return false;
     if (getMergeMetadata() != null ? 
!getMergeMetadata().equals(that.getMergeMetadata()) : that.getMergeMetadata() 
!= null)
       return false;
+    if (getNumWorkers() != null ? 
!getNumWorkers().equals(that.getNumWorkers()) : that.getNumWorkers() != null)
+      return false;
+    if (getNumAckers() != null ? !getNumAckers().equals(that.getNumAckers()) : 
that.getNumAckers() != null)
+      return false;
+    if (getSpoutParallelism() != null ? 
!getSpoutParallelism().equals(that.getSpoutParallelism()) : 
that.getSpoutParallelism() != null)
+      return false;
+    if (getSpoutNumTasks() != null ? 
!getSpoutNumTasks().equals(that.getSpoutNumTasks()) : that.getSpoutNumTasks() 
!= null)
+      return false;
+    if (getParserParallelism() != null ? 
!getParserParallelism().equals(that.getParserParallelism()) : 
that.getParserParallelism() != null)
+      return false;
+    if (getParserNumTasks() != null ? 
!getParserNumTasks().equals(that.getParserNumTasks()) : 
that.getParserNumTasks() != null)
+      return false;
+    if (getErrorWriterParallelism() != null ? 
!getErrorWriterParallelism().equals(that.getErrorWriterParallelism()) : 
that.getErrorWriterParallelism() != null)
+      return false;
+    if (getErrorWriterNumTasks() != null ? 
!getErrorWriterNumTasks().equals(that.getErrorWriterNumTasks()) : 
that.getErrorWriterNumTasks() != null)
+      return false;
+    if (getSpoutConfig() != null ? 
!getSpoutConfig().equals(that.getSpoutConfig()) : that.getSpoutConfig() != null)
+      return false;
+    if (getSecurityProtocol() != null ? 
!getSecurityProtocol().equals(that.getSecurityProtocol()) : 
that.getSecurityProtocol() != null)
+      return false;
+    if (getStormConfig() != null ? 
!getStormConfig().equals(that.getStormConfig()) : that.getStormConfig() != null)
+      return false;
+    if (getParserConfig() != null ? 
!getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != 
null)
+      return false;
     return getFieldTransformations() != null ? 
getFieldTransformations().equals(that.getFieldTransformations()) : 
that.getFieldTransformations() == null;
 
   }
@@ -189,10 +374,21 @@ public class SensorParserConfig implements Serializable {
     result = 31 * result + (getWriterClassName() != null ? 
getWriterClassName().hashCode() : 0);
     result = 31 * result + (getErrorWriterClassName() != null ? 
getErrorWriterClassName().hashCode() : 0);
     result = 31 * result + (getInvalidWriterClassName() != null ? 
getInvalidWriterClassName().hashCode() : 0);
-    result = 31 * result + (getParserConfig() != null ? 
getParserConfig().hashCode() : 0);
-    result = 31 * result + (getFieldTransformations() != null ? 
getFieldTransformations().hashCode() : 0);
     result = 31 * result + (getReadMetadata() != null ? 
getReadMetadata().hashCode() : 0);
     result = 31 * result + (getMergeMetadata() != null ? 
getMergeMetadata().hashCode() : 0);
+    result = 31 * result + (getNumWorkers() != null ? 
getNumWorkers().hashCode() : 0);
+    result = 31 * result + (getNumAckers() != null ? getNumAckers().hashCode() 
: 0);
+    result = 31 * result + (getSpoutParallelism() != null ? 
getSpoutParallelism().hashCode() : 0);
+    result = 31 * result + (getSpoutNumTasks() != null ? 
getSpoutNumTasks().hashCode() : 0);
+    result = 31 * result + (getParserParallelism() != null ? 
getParserParallelism().hashCode() : 0);
+    result = 31 * result + (getParserNumTasks() != null ? 
getParserNumTasks().hashCode() : 0);
+    result = 31 * result + (getErrorWriterParallelism() != null ? 
getErrorWriterParallelism().hashCode() : 0);
+    result = 31 * result + (getErrorWriterNumTasks() != null ? 
getErrorWriterNumTasks().hashCode() : 0);
+    result = 31 * result + (getSpoutConfig() != null ? 
getSpoutConfig().hashCode() : 0);
+    result = 31 * result + (getSecurityProtocol() != null ? 
getSecurityProtocol().hashCode() : 0);
+    result = 31 * result + (getStormConfig() != null ? 
getStormConfig().hashCode() : 0);
+    result = 31 * result + (getParserConfig() != null ? 
getParserConfig().hashCode() : 0);
+    result = 31 * result + (getFieldTransformations() != null ? 
getFieldTransformations().hashCode() : 0);
     return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-data-management/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/pom.xml 
b/metron-platform/metron-data-management/pom.xml
index 90c2c52..3fccc0a 100644
--- a/metron-platform/metron-data-management/pom.xml
+++ b/metron-platform/metron-data-management/pom.xml
@@ -384,7 +384,17 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
-                            
<createDependencyReducedPom>false</createDependencyReducedPom>
+                          
<createDependencyReducedPom>false</createDependencyReducedPom>
+                          <filters>
+                            <filter>
+                              <artifact>*:*</artifact>
+                              <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                              </excludes>
+                            </filter>
+                          </filters> 
                             <relocations>
                                 <relocation>
                                     <pattern>com.google.common</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/pom.xml 
b/metron-platform/metron-elasticsearch/pom.xml
index 40989c6..0005484 100644
--- a/metron-platform/metron-elasticsearch/pom.xml
+++ b/metron-platform/metron-elasticsearch/pom.xml
@@ -242,6 +242,16 @@
                         <configuration>
                             
<shadedArtifactAttached>true</shadedArtifactAttached>
                             <shadedClassifierName>uber</shadedClassifierName>
+                            <filters>
+                              <filter>
+                                <artifact>*:*</artifact>
+                                <excludes>
+                                  <exclude>META-INF/*.SF</exclude>
+                                  <exclude>META-INF/*.DSA</exclude>
+                                  <exclude>META-INF/*.RSA</exclude>
+                                </excludes>
+                              </filter>
+                            </filters>
                             <relocations>
                                 <relocation>
                                     <pattern>com.google.common</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 01c113c..0d7a76c 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -19,59 +19,59 @@ package org.apache.metron.elasticsearch.dao;
 
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
-import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.search.*;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.Group;
+import org.apache.metron.indexing.dao.search.GroupOrder;
+import org.apache.metron.indexing.dao.search.GroupOrderType;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
+import org.apache.metron.indexing.dao.search.GroupResult;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
-import org.elasticsearch.action.get.GetRequestBuilder;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.*;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.action.update.UpdateResponse;
+import org.apache.metron.indexing.dao.search.SearchResult;
+import org.apache.metron.indexing.dao.search.SortOrder;
+import org.apache.metron.indexing.dao.update.Document;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.MultiSearchResponse;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.index.mapper.ip.IpFieldMapper;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.Aggregations;
-import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
-import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
-import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
+import org.elasticsearch.search.aggregations.metrics.sum.Sum;
+import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.search.sort.*;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Date;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
 public class ElasticsearchDao implements IndexDao {
   private transient TransportClient client;
@@ -115,26 +115,17 @@ public class ElasticsearchDao implements IndexDao {
             .size(searchRequest.getSize())
             .from(searchRequest.getFrom())
             .query(new QueryStringQueryBuilder(searchRequest.getQuery()))
-
             .trackScores(true);
+    searchRequest.getSort().forEach(sortField -> 
searchSourceBuilder.sort(sortField.getField(), 
getElasticsearchSortOrder(sortField.getSortOrder())));
     Optional<List<String>> fields = searchRequest.getFields();
     if (fields.isPresent()) {
       searchSourceBuilder.fields(fields.get());
     } else {
       searchSourceBuilder.fetchSource(true);
     }
-    for (SortField sortField : searchRequest.getSort()) {
-      FieldSortBuilder fieldSortBuilder = new 
FieldSortBuilder(sortField.getField());
-      if (sortField.getSortOrder() == 
org.apache.metron.indexing.dao.search.SortOrder.DESC) {
-        fieldSortBuilder.order(org.elasticsearch.search.sort.SortOrder.DESC);
-      } else {
-        fieldSortBuilder.order(org.elasticsearch.search.sort.SortOrder.ASC);
-      }
-      searchSourceBuilder = searchSourceBuilder.sort(fieldSortBuilder);
-    }
     Optional<List<String>> facetFields = searchRequest.getFacetFields();
     if (facetFields.isPresent()) {
-      addFacetFields(searchSourceBuilder, facetFields.get());
+      facetFields.get().forEach(field -> searchSourceBuilder.aggregation(new 
TermsBuilder(getFacentAggregationName(field)).field(field)));
     }
     String[] wildcardIndices = searchRequest.getIndices().stream().map(index 
-> String.format("%s*", index)).toArray(value -> new 
String[searchRequest.getIndices().size()]);
     org.elasticsearch.action.search.SearchResponse elasticsearchResponse;
@@ -146,23 +137,8 @@ public class ElasticsearchDao implements IndexDao {
     }
     SearchResponse searchResponse = new SearchResponse();
     searchResponse.setTotal(elasticsearchResponse.getHits().getTotalHits());
-    
searchResponse.setResults(Arrays.stream(elasticsearchResponse.getHits().getHits()).map(searchHit
 -> {
-      SearchResult searchResult = new SearchResult();
-      searchResult.setId(searchHit.getId());
-      Map<String, Object> source;
-      if (fields.isPresent()) {
-        source = new HashMap<>();
-        searchHit.getFields().forEach((key, value) -> {
-          source.put(key, value.getValues().size() == 1 ? value.getValue() : 
value.getValues());
-        });
-      } else {
-        source = searchHit.getSource();
-      }
-      searchResult.setSource(source);
-      searchResult.setScore(searchHit.getScore());
-      searchResult.setIndex(searchHit.getIndex());
-      return searchResult;
-    }).collect(Collectors.toList()));
+    
searchResponse.setResults(Arrays.stream(elasticsearchResponse.getHits().getHits()).map(searchHit
 ->
+        getSearchResult(searchHit, 
fields.isPresent())).collect(Collectors.toList()));
     if (facetFields.isPresent()) {
       Map<String, FieldType> commonColumnMetadata;
       try {
@@ -176,6 +152,37 @@ public class ElasticsearchDao implements IndexDao {
   }
 
   @Override
+  public GroupResponse group(GroupRequest groupRequest) throws 
InvalidSearchException {
+    if(client == null) {
+      throw new InvalidSearchException("Uninitialized Dao!  You must call 
init() prior to use.");
+    }
+    if (groupRequest.getGroups() == null || groupRequest.getGroups().size() == 
0) {
+      throw new InvalidSearchException("At least 1 group must be provided.");
+    }
+    final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+    searchSourceBuilder.query(new 
QueryStringQueryBuilder(groupRequest.getQuery()));
+    searchSourceBuilder.aggregation(getGroupsTermBuilder(groupRequest, 0));
+    String[] wildcardIndices = groupRequest.getIndices().stream().map(index -> 
String.format("%s*", index)).toArray(value -> new 
String[groupRequest.getIndices().size()]);
+    org.elasticsearch.action.search.SearchResponse elasticsearchResponse;
+    try {
+      elasticsearchResponse = client.search(new 
org.elasticsearch.action.search.SearchRequest(wildcardIndices)
+          .source(searchSourceBuilder)).actionGet();
+    } catch (SearchPhaseExecutionException e) {
+      throw new InvalidSearchException("Could not execute search", e);
+    }
+    Map<String, FieldType> commonColumnMetadata;
+    try {
+      commonColumnMetadata = 
getCommonColumnMetadata(groupRequest.getIndices());
+    } catch (IOException e) {
+      throw new InvalidSearchException(String.format("Could not get common 
column metadata for indices %s", 
Arrays.toString(groupRequest.getIndices().toArray())));
+    }
+    GroupResponse groupResponse = new GroupResponse();
+    groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField());
+    groupResponse.setGroupResults(getGroupResults(groupRequest, 0, 
elasticsearchResponse.getAggregations(), commonColumnMetadata));
+    return groupResponse;
+  }
+
+  @Override
   public synchronized void init(AccessConfig config) {
     if(this.client == null) {
       this.client = 
ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get(), 
config.getOptionalSettings());
@@ -330,9 +337,17 @@ public class ElasticsearchDao implements IndexDao {
     return latestIndices.values().toArray(new String[latestIndices.size()]);
   }
 
-  public void addFacetFields(SearchSourceBuilder searchSourceBuilder, 
List<String> fields) {
-    for(String field: fields) {
-      searchSourceBuilder = searchSourceBuilder.aggregation(new 
TermsBuilder(getAggregationName(field)).field(field));
+  private org.elasticsearch.search.sort.SortOrder getElasticsearchSortOrder(
+      org.apache.metron.indexing.dao.search.SortOrder sortOrder) {
+    return sortOrder == org.apache.metron.indexing.dao.search.SortOrder.DESC ?
+        org.elasticsearch.search.sort.SortOrder.DESC : 
org.elasticsearch.search.sort.SortOrder.ASC;
+  }
+
+  private Order getElasticsearchGroupOrder(GroupOrder groupOrder) {
+    if (groupOrder.getGroupOrderType() == GroupOrderType.TERM) {
+      return groupOrder.getSortOrder() == SortOrder.ASC ? Order.term(true) : 
Order.term(false);
+    } else {
+      return groupOrder.getSortOrder() == SortOrder.ASC ? Order.count(true) : 
Order.count(false);
     }
   }
 
@@ -340,33 +355,94 @@ public class ElasticsearchDao implements IndexDao {
     Map<String, Map<String, Long>> fieldCounts = new HashMap<>();
     for (String field: fields) {
       Map<String, Long> valueCounts = new HashMap<>();
-      Aggregation aggregation = aggregations.get(getAggregationName(field));
-      if (aggregation instanceof LongTerms) {
-        LongTerms longTerms = (LongTerms) aggregation;
-        FieldType type = commonColumnMetadata.get(field);
-        if (FieldType.IP.equals(type)) {
-          longTerms.getBuckets().stream().forEach(bucket -> 
valueCounts.put(IpFieldMapper.longToIp((Long) bucket.getKey()), 
bucket.getDocCount()));
-        } else if (FieldType.BOOLEAN.equals(type)) {
-          longTerms.getBuckets().stream().forEach(bucket -> {
-            String key = (Long) bucket.getKey() == 1 ? "true" : "false";
-            valueCounts.put(key, bucket.getDocCount());
-          });
-        } else {
-          longTerms.getBuckets().stream().forEach(bucket -> 
valueCounts.put(bucket.getKeyAsString(), bucket.getDocCount()));
-        }
-      } else if (aggregation instanceof DoubleTerms) {
-        DoubleTerms doubleTerms = (DoubleTerms) aggregation;
-        doubleTerms.getBuckets().stream().forEach(bucket -> 
valueCounts.put(bucket.getKeyAsString(), bucket.getDocCount()));
-      } else if (aggregation instanceof StringTerms) {
-        StringTerms stringTerms = (StringTerms) aggregation;
-        stringTerms.getBuckets().stream().forEach(bucket -> 
valueCounts.put(bucket.getKeyAsString(), bucket.getDocCount()));
+      Aggregation aggregation = 
aggregations.get(getFacentAggregationName(field));
+      if (aggregation instanceof Terms) {
+        Terms terms = (Terms) aggregation;
+        terms.getBuckets().stream().forEach(bucket -> 
valueCounts.put(formatKey(bucket.getKey(), commonColumnMetadata.get(field)), 
bucket.getDocCount()));
       }
       fieldCounts.put(field, valueCounts);
     }
     return fieldCounts;
   }
 
-  private String getAggregationName(String field) {
+  private String formatKey(Object key, FieldType type) {
+    if (FieldType.IP.equals(type)) {
+      return IpFieldMapper.longToIp((Long) key);
+    } else if (FieldType.BOOLEAN.equals(type)) {
+      return (Long) key == 1 ? "true" : "false";
+    } else {
+      return key.toString();
+    }
+  }
+
+  private TermsBuilder getGroupsTermBuilder(GroupRequest groupRequest, int 
index) {
+    List<Group> groups = groupRequest.getGroups();
+    Group group = groups.get(index);
+    String aggregationName = getGroupByAggregationName(group.getField());
+    TermsBuilder termsBuilder = new TermsBuilder(aggregationName)
+        .field(group.getField())
+        .size(accessConfig.getMaxSearchGroups())
+        .order(getElasticsearchGroupOrder(group.getOrder()));
+    if (index < groups.size() - 1) {
+      termsBuilder.subAggregation(getGroupsTermBuilder(groupRequest, index + 
1));
+    }
+    Optional<String> scoreField = groupRequest.getScoreField();
+    if (scoreField.isPresent()) {
+      termsBuilder.subAggregation(new 
SumBuilder(getSumAggregationName(scoreField.get())).field(scoreField.get()).missing(0));
+    }
+    return termsBuilder;
+  }
+
+  private List<GroupResult> getGroupResults(GroupRequest groupRequest, int 
index, Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) {
+    List<Group> groups = groupRequest.getGroups();
+    String field = groups.get(index).getField();
+    Terms terms = aggregations.get(getGroupByAggregationName(field));
+    List<GroupResult> searchResultGroups = new ArrayList<>();
+    for(Bucket bucket: terms.getBuckets()) {
+      GroupResult groupResult = new GroupResult();
+      groupResult.setKey(formatKey(bucket.getKey(), 
commonColumnMetadata.get(field)));
+      groupResult.setTotal(bucket.getDocCount());
+      Optional<String> scoreField = groupRequest.getScoreField();
+      if (scoreField.isPresent()) {
+        Sum score = 
bucket.getAggregations().get(getSumAggregationName(scoreField.get()));
+        groupResult.setScore(score.getValue());
+      }
+      if (index < groups.size() - 1) {
+        groupResult.setGroupedBy(groups.get(index + 1).getField());
+        groupResult.setGroupResults(getGroupResults(groupRequest, index + 1, 
bucket.getAggregations(), commonColumnMetadata));
+      }
+      searchResultGroups.add(groupResult);
+    }
+    return searchResultGroups;
+  }
+
+  private SearchResult getSearchResult(SearchHit searchHit, boolean 
fieldsPresent) {
+    SearchResult searchResult = new SearchResult();
+    searchResult.setId(searchHit.getId());
+    Map<String, Object> source;
+    if (fieldsPresent) {
+      source = new HashMap<>();
+      searchHit.getFields().forEach((key, value) -> {
+        source.put(key, value.getValues().size() == 1 ? value.getValue() : 
value.getValues());
+      });
+    } else {
+      source = searchHit.getSource();
+    }
+    searchResult.setSource(source);
+    searchResult.setScore(searchHit.getScore());
+    searchResult.setIndex(searchHit.getIndex());
+    return searchResult;
+  }
+
+  private String getFacentAggregationName(String field) {
     return String.format("%s_count", field);
   }
+
+  private String getGroupByAggregationName(String field) {
+    return String.format("%s_group", field);
+  }
+
+  private String getSumAggregationName(String field) {
+    return String.format("%s_score", field);
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
index d794ac9..5de9fd2 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
@@ -50,7 +50,7 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
    *     "long_field": { "type": "long" },
    *     "timestamp" : { "type": "date" },
    *     "latitude" : { "type": "float" },
-   *     "double_field": { "type": "double" },
+   *     "score": { "type": "double" },
    *     "is_alert": { "type": "boolean" },
    *     "location_point": { "type": "geo_point" },
    *     "bro_field": { "type": "string" },
@@ -72,7 +72,7 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
    *     "long_field": { "type": "long" },
    *     "timestamp" : { "type": "date" },
    *     "latitude" : { "type": "float" },
-   *     "double_field": { "type": "double" },
+   *     "score": { "type": "double" },
    *     "is_alert": { "type": "boolean" },
    *     "location_point": { "type": "geo_point" },
    *     "snort_field": { "type": "integer" },
@@ -91,6 +91,7 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
     ret.init(
             new AccessConfig() {{
               setMaxSearchResults(100);
+              setMaxSearchGroups(100);
               setGlobalConfigSupplier( () ->
                 new HashMap<String, Object>() {{
                   put("es.clustername", "metron");

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-enrichment/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/pom.xml 
b/metron-platform/metron-enrichment/pom.xml
index 37cb49f..dd3998b 100644
--- a/metron-platform/metron-enrichment/pom.xml
+++ b/metron-platform/metron-enrichment/pom.xml
@@ -94,6 +94,23 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>ch.hsr</groupId>
+            <artifactId>geohash</artifactId>
+            <version>1.3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.locationtech.spatial4j</groupId>
+            <artifactId>spatial4j</artifactId>
+            <version>0.6</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.vividsolutions</groupId>
+                    <artifactId>jts-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
             <groupId>com.maxmind.geoip2</groupId>
             <artifactId>geoip2</artifactId>
             <version>${geoip.version}</version>
@@ -313,6 +330,16 @@
                         <configuration>
                             
<shadedArtifactAttached>true</shadedArtifactAttached>
                             <shadedClassifierName>uber</shadedClassifierName>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
                             <relocations>
                                 <relocation>
                                     <pattern>com.fasterxml.jackson</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoLiteDatabase.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoLiteDatabase.java
 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoLiteDatabase.java
index 0f9bf37..f5d20f7 100644
--- 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoLiteDatabase.java
+++ 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoLiteDatabase.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.enrichment.adapters.geo;
 
+import ch.hsr.geohash.WGS84Point;
 import com.maxmind.db.CHMCache;
 import com.maxmind.geoip2.DatabaseReader;
 import com.maxmind.geoip2.exception.AddressNotFoundException;
@@ -35,11 +36,16 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.validator.routines.InetAddressValidator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +63,42 @@ public enum GeoLiteDatabase {
   private static volatile String hdfsLoc = GEO_HDFS_FILE_DEFAULT;
   private static DatabaseReader reader = null;
 
+  public enum GeoProps {
+    LOC_ID("locID"),
+    COUNTRY("country"),
+    CITY("city"),
+    POSTAL_CODE("postalCode"),
+    DMA_CODE("dmaCode"),
+    LATITUDE("latitude"),
+    LONGITUDE("longitude"),
+    LOCATION_POINT("location_point"),
+    ;
+    Function<Map<String, String>, String> getter;
+    String simpleName;
+
+    GeoProps(String simpleName) {
+      this(simpleName, m -> m.get(simpleName));
+    }
+
+    GeoProps(String simpleName,
+             Function<Map<String, String>, String> getter
+    ) {
+      this.simpleName = simpleName;
+      this.getter = getter;
+    }
+    public String getSimpleName() {
+      return simpleName;
+    }
+
+    public String get(Map<String, String> map) {
+      return getter.apply(map);
+    }
+
+    public void set(Map<String, String> map, String val) {
+      map.put(simpleName, val);
+    }
+  }
+
   public synchronized void updateIfNecessary(Map<String, Object> globalConfig) 
{
     // Reload database if necessary (file changes on HDFS)
     LOG.trace("[Metron] Determining if GeoIpDatabase update required");
@@ -143,24 +185,24 @@ public enum GeoLiteDatabase {
       Postal postal = cityResponse.getPostal();
       Location location = cityResponse.getLocation();
 
-      geoInfo.put("locID", convertNullToEmptyString(city.getGeoNameId()));
-      geoInfo.put("country", convertNullToEmptyString(country.getIsoCode()));
-      geoInfo.put("city", convertNullToEmptyString(city.getName()));
-      geoInfo.put("postalCode", convertNullToEmptyString(postal.getCode()));
-      geoInfo.put("dmaCode", 
convertNullToEmptyString(location.getMetroCode()));
+      GeoProps.LOC_ID.set(geoInfo, 
convertNullToEmptyString(city.getGeoNameId()));
+      GeoProps.COUNTRY.set(geoInfo, 
convertNullToEmptyString(country.getIsoCode()));
+      GeoProps.CITY.set(geoInfo, convertNullToEmptyString(city.getName()));
+      GeoProps.POSTAL_CODE.set(geoInfo, 
convertNullToEmptyString(postal.getCode()));
+      GeoProps.DMA_CODE.set(geoInfo, 
convertNullToEmptyString(location.getMetroCode()));
 
       Double latitudeRaw = location.getLatitude();
       String latitude = convertNullToEmptyString(latitudeRaw);
-      geoInfo.put("latitude", latitude);
+      GeoProps.LATITUDE.set(geoInfo, latitude);
 
       Double longitudeRaw = location.getLongitude();
       String longitude = convertNullToEmptyString(longitudeRaw);
-      geoInfo.put("longitude", longitude);
+      GeoProps.LONGITUDE.set(geoInfo, longitude);
 
       if (latitudeRaw == null || longitudeRaw == null) {
-        geoInfo.put("location_point", "");
+        GeoProps.LOCATION_POINT.set(geoInfo, "");
       } else {
-        geoInfo.put("location_point", latitude + "," + longitude);
+        GeoProps.LOCATION_POINT.set(geoInfo, latitude + "," + longitude);
       }
 
       return Optional.of(geoInfo);
@@ -174,6 +216,23 @@ public enum GeoLiteDatabase {
     return Optional.empty();
   }
 
+  public Optional<WGS84Point> toPoint(Map<String, String> geoInfo) {
+    String latitude = GeoProps.LATITUDE.get(geoInfo);
+    String longitude = GeoProps.LONGITUDE.get(geoInfo);
+    if(latitude == null || longitude == null) {
+      return Optional.empty();
+    }
+
+    try {
+      double latD = Double.parseDouble(latitude.toString());
+      double longD = Double.parseDouble(longitude.toString());
+      return Optional.of(new WGS84Point(latD, longD));
+    } catch (NumberFormatException nfe) {
+      LOG.warn(String.format("Invalid lat/long: %s/%s: %s", latitude, 
longitude, nfe.getMessage()), nfe);
+      return Optional.empty();
+    }
+  }
+
   protected String convertNullToEmptyString(Object raw) {
     return raw == null ? "" : String.valueOf(raw);
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategies.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategies.java
 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategies.java
new file mode 100644
index 0000000..6af214e
--- /dev/null
+++ 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategies.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.adapters.geo.hash;
+
+import ch.hsr.geohash.WGS84Point;
+import org.locationtech.spatial4j.distance.DistanceUtils;
+
+public enum DistanceStrategies implements DistanceStrategy {
+  HAVERSINE((p1, p2) -> 
DistanceUtils.EARTH_MEAN_RADIUS_KM*DistanceUtils.distHaversineRAD( 
Math.toRadians(p1.getLatitude()), Math.toRadians(p1.getLongitude())
+                                                 , 
Math.toRadians(p2.getLatitude()), Math.toRadians(p2.getLongitude())
+                                                 )
+          ),
+  LAW_OF_COSINES((p1, p2) -> 
DistanceUtils.EARTH_MEAN_RADIUS_KM*DistanceUtils.distLawOfCosinesRAD( 
Math.toRadians(p1.getLatitude()), Math.toRadians(p1.getLongitude())
+                                                 , 
Math.toRadians(p2.getLatitude()), Math.toRadians(p2.getLongitude())
+                                                 )
+          ),
+  VICENTY((p1, p2) -> 
DistanceUtils.EARTH_MEAN_RADIUS_KM*DistanceUtils.distVincentyRAD( 
Math.toRadians(p1.getLatitude()), Math.toRadians(p1.getLongitude())
+                                                 , 
Math.toRadians(p2.getLatitude()), Math.toRadians(p2.getLongitude())
+                                                 )
+          )
+  ;
+  DistanceStrategy strat;
+  DistanceStrategies(DistanceStrategy strat) {
+    this.strat = strat;
+  }
+
+  @Override
+  public double distance(WGS84Point point1, WGS84Point point2) {
+    return strat.distance(point1, point2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategy.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategy.java
 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategy.java
new file mode 100644
index 0000000..0303986
--- /dev/null
+++ 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategy.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.adapters.geo.hash;
+
+import ch.hsr.geohash.WGS84Point;
+
+public interface DistanceStrategy {
+  public double distance(WGS84Point point1, WGS84Point point2);
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/GeoHashUtil.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/GeoHashUtil.java
 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/GeoHashUtil.java
new file mode 100644
index 0000000..902eea3
--- /dev/null
+++ 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/GeoHashUtil.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.adapters.geo.hash;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+import com.google.common.collect.Iterables;
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
+
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Optional;
+
+public enum GeoHashUtil {
+  INSTANCE;
+
+  public Optional<String> computeHash(Double latitude, Double longitude, int 
precision) {
+    if(latitude == null || longitude == null) {
+      return Optional.empty();
+    }
+    return computeHash(new WGS84Point(latitude, longitude), precision);
+  }
+
+  public Optional<String> computeHash(WGS84Point point, int precision) {
+    GeoHash hash = GeoHash.withCharacterPrecision(point.getLatitude(), 
point.getLongitude(), precision);
+    return Optional.of(hash.toBase32());
+  }
+
+  public Optional<String> computeHash(Map<String, String> geoLoc, int 
precision) {
+    Optional<WGS84Point> point = GeoLiteDatabase.INSTANCE.toPoint(geoLoc);
+    if(point.isPresent()) {
+      return computeHash(point.get(), precision);
+    }
+    else {
+      return Optional.empty();
+    }
+  }
+
+  public Optional<WGS84Point> toPoint(String hash) {
+    if(hash == null) {
+      return Optional.empty();
+    }
+    GeoHash h = GeoHash.fromGeohashString(hash);
+    return Optional.ofNullable(h == null?null:h.getPoint());
+  }
+
+  public double distance(WGS84Point point1, WGS84Point point2, 
DistanceStrategy strategy) {
+    return strategy.distance(point1, point2);
+  }
+
+  public WGS84Point centroidOfHashes(Iterable<String> hashes) {
+    Iterable<WGS84Point> points = Iterables.transform(hashes, h -> 
toPoint(h).orElse(null));
+    return centroidOfPoints(points);
+  }
+
+  public WGS84Point centroidOfPoints(Iterable<WGS84Point> points) {
+    Iterable<WGS84Point> nonNullPoints = Iterables.filter(points, p -> p != 
null);
+    return centroid(Iterables.transform(nonNullPoints
+                                       , p -> new 
AbstractMap.SimpleImmutableEntry<>(p, 1)
+                                       )
+                   );
+  }
+
+  public WGS84Point centroidOfWeightedPoints(Map<String, Number> points) {
+
+    Iterable<Map.Entry<WGS84Point, Number>> weightedPoints = 
Iterables.transform(points.entrySet()
+            , kv -> {
+              WGS84Point pt = toPoint(kv.getKey()).orElse(null);
+              return new AbstractMap.SimpleImmutableEntry<>(pt, kv.getValue());
+            });
+    return centroid(Iterables.filter(weightedPoints, kv -> kv.getKey() != 
null));
+  }
+
+  /**
+   * Find the equilibrium point of a weighted set of lat/long geo points.
+   * @param points  The points and their weights (e.g. multiplicity)
+   * @return
+   */
+  private WGS84Point centroid(Iterable<Map.Entry<WGS84Point, Number>> points) {
+    double x = 0d
+         , y = 0d
+         , z = 0d
+         , totalWeight = 0d
+         ;
+    int n = 0;
+    /**
+     * So, it's first important to realize that long/lat are not cartesian, so 
simple weighted averaging
+     * is insufficient here as it denies the fact that we're not living on a 
flat square, but rather the surface of
+     * an ellipsoid.  A crow, for instance, does not fly a straight line to an 
observer outside of Earth, but
+     * rather flies across the arc tracing the surface of earth, or a 
"great-earth arc".  When computing the centroid
+     * you want to find the centroid of the points with distance defined as 
the great-earth arc.
+     *
+     * The general strategy is to:
+     * 1. Change coordinate systems from degrees on a WGS84 projection (e.g. 
lat/long)
+     *    to a 3 dimensional cartesian surface atop a sphere approximating the 
earth.
+     * 2. Compute a weighted average of the cartesian coordinates
+     * 3. Change coordinate systems of the resulting centroid in cartesian 
space back to lat/long
+     *
+     * This is generally detailed at http://www.geomidpoint.com/example.html
+     */
+    for(Map.Entry<WGS84Point, Number> weightedPoint : points) {
+      WGS84Point pt = weightedPoint.getKey();
+      if(pt == null) {
+        continue;
+      }
+      double latRad = Math.toRadians(pt.getLatitude());
+      double longRad = Math.toRadians(pt.getLongitude());
+      double cosLat = Math.cos(latRad);
+      /*
+       Convert from lat/long coordinates to cartesian coordinates.  The 
cartesian coordinate system is a right-hand,
+       rectangular, three-dimensional, earth-fixed coordinate system
+       with an origin at (0, 0, 0). The Z-axis, is parrallel to the axis of 
rotation of the earth. The Z-coordinate
+       is positive toward the North pole. The X-Y plane lies in the equatorial 
plane. The X-axis lies along the
+       intersection of the plane containing the prime meridian and the 
equatorial plane. The X-coordinate is positive
+       toward the intersection of the prime meridian and equator.
+
+       Please see 
https://en.wikipedia.org/wiki/Geographic_coordinate_conversion#From_geodetic_to_ECEF_coordinates
+       for more information about this coordinate transformation.
+       */
+      double ptX = cosLat * Math.cos(longRad);
+      double ptY = cosLat * Math.sin(longRad);
+      double ptZ = Math.sin(latRad);
+      double weight = weightedPoint.getValue().doubleValue();
+      x += ptX*weight;
+      y += ptY*weight;
+      z += ptZ*weight;
+      n++;
+      totalWeight += weight;
+    }
+    if(n == 0) {
+      return null;
+    }
+    //average the vector representation in cartesian space, forming the center 
of gravity in cartesian space
+    x /= totalWeight;
+    y /= totalWeight;
+    z /= totalWeight;
+
+    //convert the cartesian representation back to radians
+    double longitude = Math.atan2(y, x);
+    double hypotenuse = Math.sqrt(x*x + y*y);
+    double latitude = Math.atan2(z, hypotenuse);
+
+    //convert the radians back to degrees latitude and longitude.
+    return new WGS84Point(Math.toDegrees(latitude), Math.toDegrees(longitude));
+  }
+
+  public double maxDistanceHashes(Iterable<String> hashes, DistanceStrategy 
strategy) {
+    Iterable<WGS84Point> points = Iterables.transform(hashes, s -> 
toPoint(s).orElse(null));
+    return maxDistancePoints(Iterables.filter(points, p -> p != null), 
strategy);
+  }
+
+  public double maxDistancePoints(Iterable<WGS84Point> points, 
DistanceStrategy strategy) {
+    //Note: because distance is commutative, we only need search the upper 
triangle
+    int i = 0;
+    double max = Double.NaN;
+    for(WGS84Point pt1 : points) {
+      int j = 0;
+      for(WGS84Point pt2 : points) {
+        if(j <= i) {
+          double d = strategy.distance(pt1, pt2);
+          if(Double.isNaN(max)|| d > max) {
+            max = d;
+          }
+          j++;
+        }
+        else {
+          break;
+        }
+      }
+      i++;
+    }
+    return max;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoHashFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoHashFunctions.java
 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoHashFunctions.java
new file mode 100644
index 0000000..a1e64c5
--- /dev/null
+++ 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoHashFunctions.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.stellar;
+
+import ch.hsr.geohash.WGS84Point;
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
+import org.apache.metron.enrichment.adapters.geo.hash.DistanceStrategies;
+import org.apache.metron.enrichment.adapters.geo.hash.DistanceStrategy;
+import org.apache.metron.enrichment.adapters.geo.hash.GeoHashUtil;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.Stellar;
+import org.apache.metron.stellar.dsl.StellarFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class GeoHashFunctions {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @Stellar(name="TO_LATLONG"
+          ,namespace="GEOHASH"
+          ,description="Compute the lat/long of a given 
[geohash](https://en.wikipedia.org/wiki/Geohash)"
+          ,params = {
+                      "hash - The 
[geohash](https://en.wikipedia.org/wiki/Geohash)"
+                    }
+          ,returns = "A map containing the latitude and longitude of the hash 
(keys \"latitude\" and \"longitude\")"
+  )
+  public static class ToLatLong implements StellarFunction {
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      if(args.size() < 1) {
+        return null;
+      }
+      String hash = (String)args.get(0);
+      if(hash == null) {
+        return null;
+      }
+
+      Optional<WGS84Point> point = GeoHashUtil.INSTANCE.toPoint(hash);
+      if(point.isPresent()) {
+        Map<String, Object> ret = new HashMap<>();
+        ret.put(GeoLiteDatabase.GeoProps.LONGITUDE.getSimpleName(), 
point.get().getLongitude());
+        ret.put(GeoLiteDatabase.GeoProps.LATITUDE.getSimpleName(), 
point.get().getLatitude());
+        return ret;
+      }
+      return null;
+    }
+
+    @Override
+    public void initialize(Context context) {
+
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+  }
+
+  @Stellar(name="FROM_LATLONG"
+          ,namespace="GEOHASH"
+          ,description="Compute 
[geohash](https://en.wikipedia.org/wiki/Geohash) given a lat/long"
+          ,params = {
+                      "latitude - The latitude",
+                      "longitude - The longitude",
+                      "character_precision? - The number of characters to use 
in the hash. Default is 12"
+                    }
+          ,returns = "A [geohash](https://en.wikipedia.org/wiki/Geohash) of 
the lat/long"
+  )
+  public static class FromLatLong implements StellarFunction {
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      if(args.size() < 2) {
+        return null;
+      }
+      Object latObj = args.get(0);
+      Object longObj = args.get(1);
+      if(latObj == null || longObj == null) {
+        return null;
+      }
+      Double latitude = ConversionUtils.convert(latObj, Double.class);
+      Double longitude = ConversionUtils.convert(longObj, Double.class);
+      int charPrecision = 12;
+      if(args.size() > 2) {
+        charPrecision = ConversionUtils.convert(args.get(2), Integer.class);
+      }
+      Optional<String> ret = GeoHashUtil.INSTANCE.computeHash(latitude, 
longitude, charPrecision);
+      return ret.orElse(null);
+    }
+
+    @Override
+    public void initialize(Context context) {
+
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+  }
+
+  @Stellar(name="FROM_LOC"
+          ,namespace="GEOHASH"
+          ,description="Compute 
[geohash](https://en.wikipedia.org/wiki/Geohash) given a geo enrichment 
location"
+          ,params = {
+                      "map - the latitude and logitude in a map (the output of 
GEO_GET)",
+                      "character_precision? - The number of characters to use 
in the hash. Default is 12"
+                    }
+          ,returns = "A [geohash](https://en.wikipedia.org/wiki/Geohash) of 
the location"
+  )
+  public static class FromLoc implements StellarFunction {
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      if(args.size() < 1) {
+        return null;
+      }
+      Map<String, String> map = (Map<String, String>) args.get(0);
+      if(map == null) {
+        return null;
+      }
+      int charPrecision = 12;
+      if(args.size() > 1) {
+        charPrecision = ConversionUtils.convert(args.get(1), Integer.class);
+      }
+      Optional<String> ret = GeoHashUtil.INSTANCE.computeHash(map, 
charPrecision);
+      return ret.orElse(null);
+    }
+
+    @Override
+    public void initialize(Context context) {
+
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+  }
+
+
+  @Stellar(name="DIST"
+          ,namespace="GEOHASH"
+          ,description="Compute the distance between 
[geohashes](https://en.wikipedia.org/wiki/Geohash)"
+          ,params = {
+                      "hash1 - The first location as a geohash",
+                      "hash2 - The second location as a geohash",
+                      "strategy? - The great circle distance strategy to use. 
One of [HAVERSINE](https://en.wikipedia.org/wiki/Haversine_formula), 
[LAW_OF_COSINES](https://en.wikipedia.org/wiki/Law_of_cosines#Using_the_distance_formula),
 or [VICENTY](https://en.wikipedia.org/wiki/Vincenty%27s_formulae).  Haversine 
is default."
+                    }
+          ,returns = "The distance in kilometers between the hashes"
+  )
+  public static class Dist implements StellarFunction {
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      if(args.size() < 2) {
+        return null;
+      }
+      String hash1 = (String)args.get(0);
+      if(hash1 == null) {
+        return null;
+      }
+      Optional<WGS84Point> pt1 = GeoHashUtil.INSTANCE.toPoint(hash1);
+      String hash2 = (String)args.get(1);
+      if(hash2 == null) {
+        return null;
+      }
+      Optional<WGS84Point> pt2 = GeoHashUtil.INSTANCE.toPoint(hash2);
+      DistanceStrategy strat = DistanceStrategies.HAVERSINE;
+      if(args.size() > 2) {
+        strat = DistanceStrategies.valueOf((String) args.get(2));
+      }
+      if(pt1.isPresent() && pt2.isPresent()) {
+        return GeoHashUtil.INSTANCE.distance(pt1.get(), pt2.get(), strat);
+      }
+      return Double.NaN;
+    }
+
+    @Override
+    public void initialize(Context context) {
+
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+  }
+
+  @Stellar(name="MAX_DIST"
+          ,namespace="GEOHASH"
+          ,description="Compute the maximum distance among a list of 
[geohashes](https://en.wikipedia.org/wiki/Geohash)"
+          ,params = {
+                      "hashes - A collection of 
[geohashes](https://en.wikipedia.org/wiki/Geohash)",
+                      "strategy? - The great circle distance strategy to use. 
One of [HAVERSINE](https://en.wikipedia.org/wiki/Haversine_formula), 
[LAW_OF_COSINES](https://en.wikipedia.org/wiki/Law_of_cosines#Using_the_distance_formula),
 or [VICENTY](https://en.wikipedia.org/wiki/Vincenty%27s_formulae).  Haversine 
is default."
+                    }
+          ,returns = "The maximum distance in kilometers between any two 
locations"
+  )
+  public static class MaxDist implements StellarFunction {
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      if(args.size() < 1) {
+        return null;
+      }
+      Iterable<String> hashes = (Iterable<String>)args.get(0);
+      if(hashes == null) {
+        return null;
+      }
+      DistanceStrategy strat = DistanceStrategies.HAVERSINE;
+      if(args.size() > 1) {
+        strat = DistanceStrategies.valueOf((String) args.get(1));
+      }
+      return GeoHashUtil.INSTANCE.maxDistanceHashes(hashes, strat);
+    }
+
+    @Override
+    public void initialize(Context context) {
+
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+  }
+
+  @Stellar(name="CENTROID"
+          ,namespace="GEOHASH"
+          ,description="Compute the centroid (geographic midpoint or center of 
gravity) of a set of [geohashes](https://en.wikipedia.org/wiki/Geohash)"
+          ,params = {
+                      "hashes - A collection of 
[geohashes](https://en.wikipedia.org/wiki/Geohash) or a map associating 
geohashes to numeric weights"
+                     ,"character_precision? - The number of characters to use 
in the hash. Default is 12"
+                    }
+          ,returns = "The geohash of the centroid"
+  )
+  public static class Centroid implements StellarFunction {
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      if(args.size() < 1) {
+        return null;
+      }
+      Object o1 = args.get(0);
+      if(o1 == null) {
+        return null;
+      }
+      WGS84Point centroid = null;
+      if(o1 instanceof Map) {
+         centroid = GeoHashUtil.INSTANCE.centroidOfWeightedPoints((Map<String, 
Number>)o1);
+      }
+      else if(o1 instanceof Iterable) {
+        centroid = GeoHashUtil.INSTANCE.centroidOfHashes((Iterable<String>)o1);
+      }
+      if(centroid == null) {
+        return null;
+      }
+      Integer precision = 12;
+      if(args.size() > 1) {
+        precision = (Integer)args.get(1);
+      }
+      return GeoHashUtil.INSTANCE.computeHash(centroid, 
precision).orElse(null);
+    }
+
+    @Override
+    public void initialize(Context context) {
+
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+  }
+}

Reply via email to