http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java index c92feab..1f300ea 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java @@ -17,9 +17,27 @@ */ package org.apache.metron.rest.service.impl; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.mock; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; import kafka.admin.AdminOperationException; import kafka.admin.AdminUtils$; import kafka.admin.RackAwareMode; @@ -27,6 +45,8 @@ import kafka.utils.ZkUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -43,25 +63,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.springframework.kafka.core.ConsumerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.mock; - @SuppressWarnings("unchecked") @RunWith(PowerMockRunner.class) @PowerMockIgnore("javax.management.*") // resolve classloader conflict @@ -72,6 +73,7 @@ public class KafkaServiceImplTest { private ZkUtils zkUtils; private KafkaConsumer<String, String> kafkaConsumer; + private KafkaProducer<String, String> kafkaProducer; private ConsumerFactory<String, String> kafkaConsumerFactory; private AdminUtils$ adminUtils; @@ -90,11 +92,12 @@ public class KafkaServiceImplTest { zkUtils = mock(ZkUtils.class); kafkaConsumerFactory = mock(ConsumerFactory.class); kafkaConsumer = mock(KafkaConsumer.class); + kafkaProducer = mock(KafkaProducer.class); adminUtils = mock(AdminUtils$.class); when(kafkaConsumerFactory.createConsumer()).thenReturn(kafkaConsumer); - kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumerFactory, adminUtils); + kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumerFactory, kafkaProducer, adminUtils); } @Test @@ -304,4 +307,16 @@ public class KafkaServiceImplTest { verifyZeroInteractions(zkUtils, adminUtils); } + + @Test + public void produceMessageShouldProperlyProduceMessage() throws Exception { + final String topicName = "t"; + final String message = "{\"field\":\"value\"}"; + + kafkaService.produceMessage(topicName, message); + + String expectedMessage = "{\"field\":\"value\"}"; + verify(kafkaProducer).send(new ProducerRecord<>(topicName, expectedMessage)); + verifyZeroInteractions(kafkaProducer); + } }
http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java index 1ded45e..cfc24f4 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java @@ -53,6 +53,8 @@ import java.io.FileWriter; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -99,6 +101,8 @@ public class SensorParserConfigServiceImplTest { @Multiline public static String jsonMapJson; + private String user = "user1"; + @Before public void setUp() throws Exception { BundleSystem.reset(); @@ -106,7 +110,10 @@ public class SensorParserConfigServiceImplTest { curatorFramework = mock(CuratorFramework.class); grokService = mock(GrokService.class); environment = mock(Environment.class); + Authentication authentication = mock(Authentication.class); + when(authentication.getName()).thenReturn(user); when(environment.getProperty(MetronRestConstants.HDFS_METRON_APPS_ROOT)).thenReturn("./target"); + SecurityContextHolder.getContext().setAuthentication(authentication); try(FileInputStream fis = new FileInputStream(new File("src/test/resources/zookeeper/bundle.properties"))) { BundleProperties properties = BundleProperties.createBasicBundleProperties(fis, new HashMap<>()); properties.setProperty(BundleProperties.BUNDLE_LIBRARY_DIRECTORY,"./target"); @@ -307,8 +314,6 @@ public class SensorParserConfigServiceImplTest { writer.write(grokStatement); writer.close(); - when(grokService.saveTemporary(grokStatement, "squid")).thenReturn(patternFile); - assertEquals(new HashMap() {{ put("elapsed", 161); put("code", 200); http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/elasticsearch-shaded/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/elasticsearch-shaded/pom.xml b/metron-platform/elasticsearch-shaded/pom.xml index bf02510..bbf96a0 100644 --- a/metron-platform/elasticsearch-shaded/pom.xml +++ b/metron-platform/elasticsearch-shaded/pom.xml @@ -89,6 +89,16 @@ <goal>shade</goal> </goals> <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> <relocations> <relocation> <pattern>com.google.common</pattern> http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-api/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-api/pom.xml b/metron-platform/metron-api/pom.xml index 912859d..8a15251 100644 --- a/metron-platform/metron-api/pom.xml +++ b/metron-platform/metron-api/pom.xml @@ -221,6 +221,16 @@ <goal>shade</goal> </goals> <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> <relocations> <relocation> <pattern>com.google.common</pattern> http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-common/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml index 390ec23..9356e13 100644 --- a/metron-platform/metron-common/pom.xml +++ b/metron-platform/metron-common/pom.xml @@ -403,6 +403,16 @@ <goal>shade</goal> </goals> <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> <relocations> <relocation> <pattern>com.google.common</pattern> http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java index f08e9c4..2d0ccd8 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java @@ -37,7 +37,155 @@ public class SensorParserConfig implements Serializable { private String invalidWriterClassName; private Boolean readMetadata = false; private Boolean mergeMetadata = false; + private Integer numWorkers = null; + private Integer numAckers= null; + private Integer spoutParallelism = 1; + private Integer spoutNumTasks = 1; + private Integer parserParallelism = 1; + private Integer parserNumTasks = 1; + private Integer errorWriterParallelism = 1; + private Integer errorWriterNumTasks = 1; + private Map<String, Object> spoutConfig = new HashMap<>(); + private String securityProtocol = null; + private Map<String, Object> stormConfig = new HashMap<>(); + + /** + * Return the number of workers for the topology. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getNumWorkers() { + return numWorkers; + } + + public void setNumWorkers(Integer numWorkers) { + this.numWorkers = numWorkers; + } + + /** + * Return the number of ackers for the topology. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getNumAckers() { + return numAckers; + } + + public void setNumAckers(Integer numAckers) { + this.numAckers = numAckers; + } + + /** + * Return the spout parallelism. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getSpoutParallelism() { + return spoutParallelism; + } + + public void setSpoutParallelism(Integer spoutParallelism) { + this.spoutParallelism = spoutParallelism; + } + + /** + * Return the spout num tasks. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getSpoutNumTasks() { + return spoutNumTasks; + } + + public void setSpoutNumTasks(Integer spoutNumTasks) { + this.spoutNumTasks = spoutNumTasks; + } + + /** + * Return the parser parallelism. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getParserParallelism() { + return parserParallelism; + } + + public void setParserParallelism(Integer parserParallelism) { + this.parserParallelism = parserParallelism; + } + + /** + * Return the parser number of tasks. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getParserNumTasks() { + return parserNumTasks; + } + + public void setParserNumTasks(Integer parserNumTasks) { + this.parserNumTasks = parserNumTasks; + } + + /** + * Return the error writer bolt parallelism. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getErrorWriterParallelism() { + return errorWriterParallelism; + } + + public void setErrorWriterParallelism(Integer errorWriterParallelism) { + this.errorWriterParallelism = errorWriterParallelism; + } + + /** + * Return the error writer bolt number of tasks. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getErrorWriterNumTasks() { + return errorWriterNumTasks; + } + + public void setErrorWriterNumTasks(Integer errorWriterNumTasks) { + this.errorWriterNumTasks = errorWriterNumTasks; + } + + /** + * Return the spout config. This includes kafka properties. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Map<String, Object> getSpoutConfig() { + return spoutConfig; + } + public void setSpoutConfig(Map<String, Object> spoutConfig) { + this.spoutConfig = spoutConfig; + } + + /** + * Return security protocol to use. This property will be used for the parser unless overridden on the CLI. + * The order of precedence is CLI > spout config > config in the sensor parser config. + * @return + */ + public String getSecurityProtocol() { + return securityProtocol; + } + + public void setSecurityProtocol(String securityProtocol) { + this.securityProtocol = securityProtocol; + } + + /** + * Return Storm topologyconfig. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Map<String, Object> getStormConfig() { + return stormConfig; + } + + public void setStormConfig(Map<String, Object> stormConfig) { + this.stormConfig = stormConfig; + } + + /** + * Return whether or not to merge metadata sent into the message. If true, then metadata become proper fields. + * @return + */ public Boolean getMergeMetadata() { return mergeMetadata; } @@ -46,6 +194,10 @@ public class SensorParserConfig implements Serializable { this.mergeMetadata = mergeMetadata; } + /** + * Return whether or not to read metadata at all. + * @return + */ public Boolean getReadMetadata() { return readMetadata; } @@ -145,10 +297,21 @@ public class SensorParserConfig implements Serializable { ", writerClassName='" + writerClassName + '\'' + ", errorWriterClassName='" + errorWriterClassName + '\'' + ", invalidWriterClassName='" + invalidWriterClassName + '\'' + - ", parserConfig=" + parserConfig + - ", fieldTransformations=" + fieldTransformations + ", readMetadata=" + readMetadata + ", mergeMetadata=" + mergeMetadata + + ", numWorkers=" + numWorkers + + ", numAckers=" + numAckers + + ", spoutParallelism=" + spoutParallelism + + ", spoutNumTasks=" + spoutNumTasks + + ", parserParallelism=" + parserParallelism + + ", parserNumTasks=" + parserNumTasks + + ", errorWriterParallelism=" + errorWriterParallelism + + ", errorWriterNumTasks=" + errorWriterNumTasks + + ", spoutConfig=" + spoutConfig + + ", securityProtocol='" + securityProtocol + '\'' + + ", stormConfig=" + stormConfig + + ", parserConfig=" + parserConfig + + ", fieldTransformations=" + fieldTransformations + '}'; } @@ -171,12 +334,34 @@ public class SensorParserConfig implements Serializable { return false; if (getInvalidWriterClassName() != null ? !getInvalidWriterClassName().equals(that.getInvalidWriterClassName()) : that.getInvalidWriterClassName() != null) return false; - if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null) - return false; if (getReadMetadata() != null ? !getReadMetadata().equals(that.getReadMetadata()) : that.getReadMetadata() != null) return false; if (getMergeMetadata() != null ? !getMergeMetadata().equals(that.getMergeMetadata()) : that.getMergeMetadata() != null) return false; + if (getNumWorkers() != null ? !getNumWorkers().equals(that.getNumWorkers()) : that.getNumWorkers() != null) + return false; + if (getNumAckers() != null ? !getNumAckers().equals(that.getNumAckers()) : that.getNumAckers() != null) + return false; + if (getSpoutParallelism() != null ? !getSpoutParallelism().equals(that.getSpoutParallelism()) : that.getSpoutParallelism() != null) + return false; + if (getSpoutNumTasks() != null ? !getSpoutNumTasks().equals(that.getSpoutNumTasks()) : that.getSpoutNumTasks() != null) + return false; + if (getParserParallelism() != null ? !getParserParallelism().equals(that.getParserParallelism()) : that.getParserParallelism() != null) + return false; + if (getParserNumTasks() != null ? !getParserNumTasks().equals(that.getParserNumTasks()) : that.getParserNumTasks() != null) + return false; + if (getErrorWriterParallelism() != null ? !getErrorWriterParallelism().equals(that.getErrorWriterParallelism()) : that.getErrorWriterParallelism() != null) + return false; + if (getErrorWriterNumTasks() != null ? !getErrorWriterNumTasks().equals(that.getErrorWriterNumTasks()) : that.getErrorWriterNumTasks() != null) + return false; + if (getSpoutConfig() != null ? !getSpoutConfig().equals(that.getSpoutConfig()) : that.getSpoutConfig() != null) + return false; + if (getSecurityProtocol() != null ? !getSecurityProtocol().equals(that.getSecurityProtocol()) : that.getSecurityProtocol() != null) + return false; + if (getStormConfig() != null ? !getStormConfig().equals(that.getStormConfig()) : that.getStormConfig() != null) + return false; + if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null) + return false; return getFieldTransformations() != null ? getFieldTransformations().equals(that.getFieldTransformations()) : that.getFieldTransformations() == null; } @@ -189,10 +374,21 @@ public class SensorParserConfig implements Serializable { result = 31 * result + (getWriterClassName() != null ? getWriterClassName().hashCode() : 0); result = 31 * result + (getErrorWriterClassName() != null ? getErrorWriterClassName().hashCode() : 0); result = 31 * result + (getInvalidWriterClassName() != null ? getInvalidWriterClassName().hashCode() : 0); - result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0); - result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0); result = 31 * result + (getReadMetadata() != null ? getReadMetadata().hashCode() : 0); result = 31 * result + (getMergeMetadata() != null ? getMergeMetadata().hashCode() : 0); + result = 31 * result + (getNumWorkers() != null ? getNumWorkers().hashCode() : 0); + result = 31 * result + (getNumAckers() != null ? getNumAckers().hashCode() : 0); + result = 31 * result + (getSpoutParallelism() != null ? getSpoutParallelism().hashCode() : 0); + result = 31 * result + (getSpoutNumTasks() != null ? getSpoutNumTasks().hashCode() : 0); + result = 31 * result + (getParserParallelism() != null ? getParserParallelism().hashCode() : 0); + result = 31 * result + (getParserNumTasks() != null ? getParserNumTasks().hashCode() : 0); + result = 31 * result + (getErrorWriterParallelism() != null ? getErrorWriterParallelism().hashCode() : 0); + result = 31 * result + (getErrorWriterNumTasks() != null ? getErrorWriterNumTasks().hashCode() : 0); + result = 31 * result + (getSpoutConfig() != null ? getSpoutConfig().hashCode() : 0); + result = 31 * result + (getSecurityProtocol() != null ? getSecurityProtocol().hashCode() : 0); + result = 31 * result + (getStormConfig() != null ? getStormConfig().hashCode() : 0); + result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0); + result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0); return result; } } http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-data-management/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/pom.xml b/metron-platform/metron-data-management/pom.xml index 90c2c52..3fccc0a 100644 --- a/metron-platform/metron-data-management/pom.xml +++ b/metron-platform/metron-data-management/pom.xml @@ -384,7 +384,17 @@ <goal>shade</goal> </goals> <configuration> - <createDependencyReducedPom>false</createDependencyReducedPom> + <createDependencyReducedPom>false</createDependencyReducedPom> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> <relocations> <relocation> <pattern>com.google.common</pattern> http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/pom.xml b/metron-platform/metron-elasticsearch/pom.xml index 40989c6..0005484 100644 --- a/metron-platform/metron-elasticsearch/pom.xml +++ b/metron-platform/metron-elasticsearch/pom.xml @@ -242,6 +242,16 @@ <configuration> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>uber</shadedClassifierName> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> <relocations> <relocation> <pattern>com.google.common</pattern> http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index 01c113c..0d7a76c 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -19,59 +19,59 @@ package org.apache.metron.elasticsearch.dao; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.metron.common.Constants; -import org.apache.metron.common.configuration.writer.WriterConfiguration; -import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; -import org.apache.metron.indexing.dao.update.Document; import org.apache.metron.indexing.dao.IndexDao; -import org.apache.metron.indexing.dao.search.*; +import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.Group; +import org.apache.metron.indexing.dao.search.GroupOrder; +import org.apache.metron.indexing.dao.search.GroupOrderType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; +import org.apache.metron.indexing.dao.search.GroupResult; +import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; -import org.elasticsearch.action.get.GetRequestBuilder; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.*; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.update.UpdateResponse; +import org.apache.metron.indexing.dao.search.SearchResult; +import org.apache.metron.indexing.dao.search.SortOrder; +import org.apache.metron.indexing.dao.update.Document; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.index.mapper.ip.IpFieldMapper; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryStringQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; -import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms; -import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; -import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order; import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.search.sort.*; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; -import java.io.IOException; -import java.util.Arrays; -import java.util.Date; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; public class ElasticsearchDao implements IndexDao { private transient TransportClient client; @@ -115,26 +115,17 @@ public class ElasticsearchDao implements IndexDao { .size(searchRequest.getSize()) .from(searchRequest.getFrom()) .query(new QueryStringQueryBuilder(searchRequest.getQuery())) - .trackScores(true); + searchRequest.getSort().forEach(sortField -> searchSourceBuilder.sort(sortField.getField(), getElasticsearchSortOrder(sortField.getSortOrder()))); Optional<List<String>> fields = searchRequest.getFields(); if (fields.isPresent()) { searchSourceBuilder.fields(fields.get()); } else { searchSourceBuilder.fetchSource(true); } - for (SortField sortField : searchRequest.getSort()) { - FieldSortBuilder fieldSortBuilder = new FieldSortBuilder(sortField.getField()); - if (sortField.getSortOrder() == org.apache.metron.indexing.dao.search.SortOrder.DESC) { - fieldSortBuilder.order(org.elasticsearch.search.sort.SortOrder.DESC); - } else { - fieldSortBuilder.order(org.elasticsearch.search.sort.SortOrder.ASC); - } - searchSourceBuilder = searchSourceBuilder.sort(fieldSortBuilder); - } Optional<List<String>> facetFields = searchRequest.getFacetFields(); if (facetFields.isPresent()) { - addFacetFields(searchSourceBuilder, facetFields.get()); + facetFields.get().forEach(field -> searchSourceBuilder.aggregation(new TermsBuilder(getFacentAggregationName(field)).field(field))); } String[] wildcardIndices = searchRequest.getIndices().stream().map(index -> String.format("%s*", index)).toArray(value -> new String[searchRequest.getIndices().size()]); org.elasticsearch.action.search.SearchResponse elasticsearchResponse; @@ -146,23 +137,8 @@ public class ElasticsearchDao implements IndexDao { } SearchResponse searchResponse = new SearchResponse(); searchResponse.setTotal(elasticsearchResponse.getHits().getTotalHits()); - searchResponse.setResults(Arrays.stream(elasticsearchResponse.getHits().getHits()).map(searchHit -> { - SearchResult searchResult = new SearchResult(); - searchResult.setId(searchHit.getId()); - Map<String, Object> source; - if (fields.isPresent()) { - source = new HashMap<>(); - searchHit.getFields().forEach((key, value) -> { - source.put(key, value.getValues().size() == 1 ? value.getValue() : value.getValues()); - }); - } else { - source = searchHit.getSource(); - } - searchResult.setSource(source); - searchResult.setScore(searchHit.getScore()); - searchResult.setIndex(searchHit.getIndex()); - return searchResult; - }).collect(Collectors.toList())); + searchResponse.setResults(Arrays.stream(elasticsearchResponse.getHits().getHits()).map(searchHit -> + getSearchResult(searchHit, fields.isPresent())).collect(Collectors.toList())); if (facetFields.isPresent()) { Map<String, FieldType> commonColumnMetadata; try { @@ -176,6 +152,37 @@ public class ElasticsearchDao implements IndexDao { } @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + if(client == null) { + throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); + } + if (groupRequest.getGroups() == null || groupRequest.getGroups().size() == 0) { + throw new InvalidSearchException("At least 1 group must be provided."); + } + final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(new QueryStringQueryBuilder(groupRequest.getQuery())); + searchSourceBuilder.aggregation(getGroupsTermBuilder(groupRequest, 0)); + String[] wildcardIndices = groupRequest.getIndices().stream().map(index -> String.format("%s*", index)).toArray(value -> new String[groupRequest.getIndices().size()]); + org.elasticsearch.action.search.SearchResponse elasticsearchResponse; + try { + elasticsearchResponse = client.search(new org.elasticsearch.action.search.SearchRequest(wildcardIndices) + .source(searchSourceBuilder)).actionGet(); + } catch (SearchPhaseExecutionException e) { + throw new InvalidSearchException("Could not execute search", e); + } + Map<String, FieldType> commonColumnMetadata; + try { + commonColumnMetadata = getCommonColumnMetadata(groupRequest.getIndices()); + } catch (IOException e) { + throw new InvalidSearchException(String.format("Could not get common column metadata for indices %s", Arrays.toString(groupRequest.getIndices().toArray()))); + } + GroupResponse groupResponse = new GroupResponse(); + groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField()); + groupResponse.setGroupResults(getGroupResults(groupRequest, 0, elasticsearchResponse.getAggregations(), commonColumnMetadata)); + return groupResponse; + } + + @Override public synchronized void init(AccessConfig config) { if(this.client == null) { this.client = ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get(), config.getOptionalSettings()); @@ -330,9 +337,17 @@ public class ElasticsearchDao implements IndexDao { return latestIndices.values().toArray(new String[latestIndices.size()]); } - public void addFacetFields(SearchSourceBuilder searchSourceBuilder, List<String> fields) { - for(String field: fields) { - searchSourceBuilder = searchSourceBuilder.aggregation(new TermsBuilder(getAggregationName(field)).field(field)); + private org.elasticsearch.search.sort.SortOrder getElasticsearchSortOrder( + org.apache.metron.indexing.dao.search.SortOrder sortOrder) { + return sortOrder == org.apache.metron.indexing.dao.search.SortOrder.DESC ? + org.elasticsearch.search.sort.SortOrder.DESC : org.elasticsearch.search.sort.SortOrder.ASC; + } + + private Order getElasticsearchGroupOrder(GroupOrder groupOrder) { + if (groupOrder.getGroupOrderType() == GroupOrderType.TERM) { + return groupOrder.getSortOrder() == SortOrder.ASC ? Order.term(true) : Order.term(false); + } else { + return groupOrder.getSortOrder() == SortOrder.ASC ? Order.count(true) : Order.count(false); } } @@ -340,33 +355,94 @@ public class ElasticsearchDao implements IndexDao { Map<String, Map<String, Long>> fieldCounts = new HashMap<>(); for (String field: fields) { Map<String, Long> valueCounts = new HashMap<>(); - Aggregation aggregation = aggregations.get(getAggregationName(field)); - if (aggregation instanceof LongTerms) { - LongTerms longTerms = (LongTerms) aggregation; - FieldType type = commonColumnMetadata.get(field); - if (FieldType.IP.equals(type)) { - longTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(IpFieldMapper.longToIp((Long) bucket.getKey()), bucket.getDocCount())); - } else if (FieldType.BOOLEAN.equals(type)) { - longTerms.getBuckets().stream().forEach(bucket -> { - String key = (Long) bucket.getKey() == 1 ? "true" : "false"; - valueCounts.put(key, bucket.getDocCount()); - }); - } else { - longTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(bucket.getKeyAsString(), bucket.getDocCount())); - } - } else if (aggregation instanceof DoubleTerms) { - DoubleTerms doubleTerms = (DoubleTerms) aggregation; - doubleTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(bucket.getKeyAsString(), bucket.getDocCount())); - } else if (aggregation instanceof StringTerms) { - StringTerms stringTerms = (StringTerms) aggregation; - stringTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(bucket.getKeyAsString(), bucket.getDocCount())); + Aggregation aggregation = aggregations.get(getFacentAggregationName(field)); + if (aggregation instanceof Terms) { + Terms terms = (Terms) aggregation; + terms.getBuckets().stream().forEach(bucket -> valueCounts.put(formatKey(bucket.getKey(), commonColumnMetadata.get(field)), bucket.getDocCount())); } fieldCounts.put(field, valueCounts); } return fieldCounts; } - private String getAggregationName(String field) { + private String formatKey(Object key, FieldType type) { + if (FieldType.IP.equals(type)) { + return IpFieldMapper.longToIp((Long) key); + } else if (FieldType.BOOLEAN.equals(type)) { + return (Long) key == 1 ? "true" : "false"; + } else { + return key.toString(); + } + } + + private TermsBuilder getGroupsTermBuilder(GroupRequest groupRequest, int index) { + List<Group> groups = groupRequest.getGroups(); + Group group = groups.get(index); + String aggregationName = getGroupByAggregationName(group.getField()); + TermsBuilder termsBuilder = new TermsBuilder(aggregationName) + .field(group.getField()) + .size(accessConfig.getMaxSearchGroups()) + .order(getElasticsearchGroupOrder(group.getOrder())); + if (index < groups.size() - 1) { + termsBuilder.subAggregation(getGroupsTermBuilder(groupRequest, index + 1)); + } + Optional<String> scoreField = groupRequest.getScoreField(); + if (scoreField.isPresent()) { + termsBuilder.subAggregation(new SumBuilder(getSumAggregationName(scoreField.get())).field(scoreField.get()).missing(0)); + } + return termsBuilder; + } + + private List<GroupResult> getGroupResults(GroupRequest groupRequest, int index, Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) { + List<Group> groups = groupRequest.getGroups(); + String field = groups.get(index).getField(); + Terms terms = aggregations.get(getGroupByAggregationName(field)); + List<GroupResult> searchResultGroups = new ArrayList<>(); + for(Bucket bucket: terms.getBuckets()) { + GroupResult groupResult = new GroupResult(); + groupResult.setKey(formatKey(bucket.getKey(), commonColumnMetadata.get(field))); + groupResult.setTotal(bucket.getDocCount()); + Optional<String> scoreField = groupRequest.getScoreField(); + if (scoreField.isPresent()) { + Sum score = bucket.getAggregations().get(getSumAggregationName(scoreField.get())); + groupResult.setScore(score.getValue()); + } + if (index < groups.size() - 1) { + groupResult.setGroupedBy(groups.get(index + 1).getField()); + groupResult.setGroupResults(getGroupResults(groupRequest, index + 1, bucket.getAggregations(), commonColumnMetadata)); + } + searchResultGroups.add(groupResult); + } + return searchResultGroups; + } + + private SearchResult getSearchResult(SearchHit searchHit, boolean fieldsPresent) { + SearchResult searchResult = new SearchResult(); + searchResult.setId(searchHit.getId()); + Map<String, Object> source; + if (fieldsPresent) { + source = new HashMap<>(); + searchHit.getFields().forEach((key, value) -> { + source.put(key, value.getValues().size() == 1 ? value.getValue() : value.getValues()); + }); + } else { + source = searchHit.getSource(); + } + searchResult.setSource(source); + searchResult.setScore(searchHit.getScore()); + searchResult.setIndex(searchHit.getIndex()); + return searchResult; + } + + private String getFacentAggregationName(String field) { return String.format("%s_count", field); } + + private String getGroupByAggregationName(String field) { + return String.format("%s_group", field); + } + + private String getSumAggregationName(String field) { + return String.format("%s_score", field); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java index d794ac9..5de9fd2 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java @@ -50,7 +50,7 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { * "long_field": { "type": "long" }, * "timestamp" : { "type": "date" }, * "latitude" : { "type": "float" }, - * "double_field": { "type": "double" }, + * "score": { "type": "double" }, * "is_alert": { "type": "boolean" }, * "location_point": { "type": "geo_point" }, * "bro_field": { "type": "string" }, @@ -72,7 +72,7 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { * "long_field": { "type": "long" }, * "timestamp" : { "type": "date" }, * "latitude" : { "type": "float" }, - * "double_field": { "type": "double" }, + * "score": { "type": "double" }, * "is_alert": { "type": "boolean" }, * "location_point": { "type": "geo_point" }, * "snort_field": { "type": "integer" }, @@ -91,6 +91,7 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { ret.init( new AccessConfig() {{ setMaxSearchResults(100); + setMaxSearchGroups(100); setGlobalConfigSupplier( () -> new HashMap<String, Object>() {{ put("es.clustername", "metron"); http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-enrichment/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/pom.xml b/metron-platform/metron-enrichment/pom.xml index 37cb49f..dd3998b 100644 --- a/metron-platform/metron-enrichment/pom.xml +++ b/metron-platform/metron-enrichment/pom.xml @@ -94,6 +94,23 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>ch.hsr</groupId> + <artifactId>geohash</artifactId> + <version>1.3.0</version> + </dependency> + <dependency> + <groupId>org.locationtech.spatial4j</groupId> + <artifactId>spatial4j</artifactId> + <version>0.6</version> + <exclusions> + <exclusion> + <groupId>com.vividsolutions</groupId> + <artifactId>jts-core</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> <groupId>com.maxmind.geoip2</groupId> <artifactId>geoip2</artifactId> <version>${geoip.version}</version> @@ -313,6 +330,16 @@ <configuration> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>uber</shadedClassifierName> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> <relocations> <relocation> <pattern>com.fasterxml.jackson</pattern> http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoLiteDatabase.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoLiteDatabase.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoLiteDatabase.java index 0f9bf37..f5d20f7 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoLiteDatabase.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoLiteDatabase.java @@ -17,6 +17,7 @@ */ package org.apache.metron.enrichment.adapters.geo; +import ch.hsr.geohash.WGS84Point; import com.maxmind.db.CHMCache; import com.maxmind.geoip2.DatabaseReader; import com.maxmind.geoip2.exception.AddressNotFoundException; @@ -35,11 +36,16 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.function.Supplier; import java.util.zip.GZIPInputStream; + +import org.apache.commons.lang3.StringUtils; import org.apache.commons.validator.routines.InetAddressValidator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.metron.stellar.common.utils.ConversionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +63,42 @@ public enum GeoLiteDatabase { private static volatile String hdfsLoc = GEO_HDFS_FILE_DEFAULT; private static DatabaseReader reader = null; + public enum GeoProps { + LOC_ID("locID"), + COUNTRY("country"), + CITY("city"), + POSTAL_CODE("postalCode"), + DMA_CODE("dmaCode"), + LATITUDE("latitude"), + LONGITUDE("longitude"), + LOCATION_POINT("location_point"), + ; + Function<Map<String, String>, String> getter; + String simpleName; + + GeoProps(String simpleName) { + this(simpleName, m -> m.get(simpleName)); + } + + GeoProps(String simpleName, + Function<Map<String, String>, String> getter + ) { + this.simpleName = simpleName; + this.getter = getter; + } + public String getSimpleName() { + return simpleName; + } + + public String get(Map<String, String> map) { + return getter.apply(map); + } + + public void set(Map<String, String> map, String val) { + map.put(simpleName, val); + } + } + public synchronized void updateIfNecessary(Map<String, Object> globalConfig) { // Reload database if necessary (file changes on HDFS) LOG.trace("[Metron] Determining if GeoIpDatabase update required"); @@ -143,24 +185,24 @@ public enum GeoLiteDatabase { Postal postal = cityResponse.getPostal(); Location location = cityResponse.getLocation(); - geoInfo.put("locID", convertNullToEmptyString(city.getGeoNameId())); - geoInfo.put("country", convertNullToEmptyString(country.getIsoCode())); - geoInfo.put("city", convertNullToEmptyString(city.getName())); - geoInfo.put("postalCode", convertNullToEmptyString(postal.getCode())); - geoInfo.put("dmaCode", convertNullToEmptyString(location.getMetroCode())); + GeoProps.LOC_ID.set(geoInfo, convertNullToEmptyString(city.getGeoNameId())); + GeoProps.COUNTRY.set(geoInfo, convertNullToEmptyString(country.getIsoCode())); + GeoProps.CITY.set(geoInfo, convertNullToEmptyString(city.getName())); + GeoProps.POSTAL_CODE.set(geoInfo, convertNullToEmptyString(postal.getCode())); + GeoProps.DMA_CODE.set(geoInfo, convertNullToEmptyString(location.getMetroCode())); Double latitudeRaw = location.getLatitude(); String latitude = convertNullToEmptyString(latitudeRaw); - geoInfo.put("latitude", latitude); + GeoProps.LATITUDE.set(geoInfo, latitude); Double longitudeRaw = location.getLongitude(); String longitude = convertNullToEmptyString(longitudeRaw); - geoInfo.put("longitude", longitude); + GeoProps.LONGITUDE.set(geoInfo, longitude); if (latitudeRaw == null || longitudeRaw == null) { - geoInfo.put("location_point", ""); + GeoProps.LOCATION_POINT.set(geoInfo, ""); } else { - geoInfo.put("location_point", latitude + "," + longitude); + GeoProps.LOCATION_POINT.set(geoInfo, latitude + "," + longitude); } return Optional.of(geoInfo); @@ -174,6 +216,23 @@ public enum GeoLiteDatabase { return Optional.empty(); } + public Optional<WGS84Point> toPoint(Map<String, String> geoInfo) { + String latitude = GeoProps.LATITUDE.get(geoInfo); + String longitude = GeoProps.LONGITUDE.get(geoInfo); + if(latitude == null || longitude == null) { + return Optional.empty(); + } + + try { + double latD = Double.parseDouble(latitude.toString()); + double longD = Double.parseDouble(longitude.toString()); + return Optional.of(new WGS84Point(latD, longD)); + } catch (NumberFormatException nfe) { + LOG.warn(String.format("Invalid lat/long: %s/%s: %s", latitude, longitude, nfe.getMessage()), nfe); + return Optional.empty(); + } + } + protected String convertNullToEmptyString(Object raw) { return raw == null ? "" : String.valueOf(raw); } http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategies.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategies.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategies.java new file mode 100644 index 0000000..6af214e --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategies.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.adapters.geo.hash; + +import ch.hsr.geohash.WGS84Point; +import org.locationtech.spatial4j.distance.DistanceUtils; + +public enum DistanceStrategies implements DistanceStrategy { + HAVERSINE((p1, p2) -> DistanceUtils.EARTH_MEAN_RADIUS_KM*DistanceUtils.distHaversineRAD( Math.toRadians(p1.getLatitude()), Math.toRadians(p1.getLongitude()) + , Math.toRadians(p2.getLatitude()), Math.toRadians(p2.getLongitude()) + ) + ), + LAW_OF_COSINES((p1, p2) -> DistanceUtils.EARTH_MEAN_RADIUS_KM*DistanceUtils.distLawOfCosinesRAD( Math.toRadians(p1.getLatitude()), Math.toRadians(p1.getLongitude()) + , Math.toRadians(p2.getLatitude()), Math.toRadians(p2.getLongitude()) + ) + ), + VICENTY((p1, p2) -> DistanceUtils.EARTH_MEAN_RADIUS_KM*DistanceUtils.distVincentyRAD( Math.toRadians(p1.getLatitude()), Math.toRadians(p1.getLongitude()) + , Math.toRadians(p2.getLatitude()), Math.toRadians(p2.getLongitude()) + ) + ) + ; + DistanceStrategy strat; + DistanceStrategies(DistanceStrategy strat) { + this.strat = strat; + } + + @Override + public double distance(WGS84Point point1, WGS84Point point2) { + return strat.distance(point1, point2); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategy.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategy.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategy.java new file mode 100644 index 0000000..0303986 --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/DistanceStrategy.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.adapters.geo.hash; + +import ch.hsr.geohash.WGS84Point; + +public interface DistanceStrategy { + public double distance(WGS84Point point1, WGS84Point point2); +} http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/GeoHashUtil.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/GeoHashUtil.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/GeoHashUtil.java new file mode 100644 index 0000000..902eea3 --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/hash/GeoHashUtil.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.adapters.geo.hash; + +import ch.hsr.geohash.GeoHash; +import ch.hsr.geohash.WGS84Point; +import com.google.common.collect.Iterables; +import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase; + +import java.util.AbstractMap; +import java.util.Map; +import java.util.Optional; + +public enum GeoHashUtil { + INSTANCE; + + public Optional<String> computeHash(Double latitude, Double longitude, int precision) { + if(latitude == null || longitude == null) { + return Optional.empty(); + } + return computeHash(new WGS84Point(latitude, longitude), precision); + } + + public Optional<String> computeHash(WGS84Point point, int precision) { + GeoHash hash = GeoHash.withCharacterPrecision(point.getLatitude(), point.getLongitude(), precision); + return Optional.of(hash.toBase32()); + } + + public Optional<String> computeHash(Map<String, String> geoLoc, int precision) { + Optional<WGS84Point> point = GeoLiteDatabase.INSTANCE.toPoint(geoLoc); + if(point.isPresent()) { + return computeHash(point.get(), precision); + } + else { + return Optional.empty(); + } + } + + public Optional<WGS84Point> toPoint(String hash) { + if(hash == null) { + return Optional.empty(); + } + GeoHash h = GeoHash.fromGeohashString(hash); + return Optional.ofNullable(h == null?null:h.getPoint()); + } + + public double distance(WGS84Point point1, WGS84Point point2, DistanceStrategy strategy) { + return strategy.distance(point1, point2); + } + + public WGS84Point centroidOfHashes(Iterable<String> hashes) { + Iterable<WGS84Point> points = Iterables.transform(hashes, h -> toPoint(h).orElse(null)); + return centroidOfPoints(points); + } + + public WGS84Point centroidOfPoints(Iterable<WGS84Point> points) { + Iterable<WGS84Point> nonNullPoints = Iterables.filter(points, p -> p != null); + return centroid(Iterables.transform(nonNullPoints + , p -> new AbstractMap.SimpleImmutableEntry<>(p, 1) + ) + ); + } + + public WGS84Point centroidOfWeightedPoints(Map<String, Number> points) { + + Iterable<Map.Entry<WGS84Point, Number>> weightedPoints = Iterables.transform(points.entrySet() + , kv -> { + WGS84Point pt = toPoint(kv.getKey()).orElse(null); + return new AbstractMap.SimpleImmutableEntry<>(pt, kv.getValue()); + }); + return centroid(Iterables.filter(weightedPoints, kv -> kv.getKey() != null)); + } + + /** + * Find the equilibrium point of a weighted set of lat/long geo points. + * @param points The points and their weights (e.g. multiplicity) + * @return + */ + private WGS84Point centroid(Iterable<Map.Entry<WGS84Point, Number>> points) { + double x = 0d + , y = 0d + , z = 0d + , totalWeight = 0d + ; + int n = 0; + /** + * So, it's first important to realize that long/lat are not cartesian, so simple weighted averaging + * is insufficient here as it denies the fact that we're not living on a flat square, but rather the surface of + * an ellipsoid. A crow, for instance, does not fly a straight line to an observer outside of Earth, but + * rather flies across the arc tracing the surface of earth, or a "great-earth arc". When computing the centroid + * you want to find the centroid of the points with distance defined as the great-earth arc. + * + * The general strategy is to: + * 1. Change coordinate systems from degrees on a WGS84 projection (e.g. lat/long) + * to a 3 dimensional cartesian surface atop a sphere approximating the earth. + * 2. Compute a weighted average of the cartesian coordinates + * 3. Change coordinate systems of the resulting centroid in cartesian space back to lat/long + * + * This is generally detailed at http://www.geomidpoint.com/example.html + */ + for(Map.Entry<WGS84Point, Number> weightedPoint : points) { + WGS84Point pt = weightedPoint.getKey(); + if(pt == null) { + continue; + } + double latRad = Math.toRadians(pt.getLatitude()); + double longRad = Math.toRadians(pt.getLongitude()); + double cosLat = Math.cos(latRad); + /* + Convert from lat/long coordinates to cartesian coordinates. The cartesian coordinate system is a right-hand, + rectangular, three-dimensional, earth-fixed coordinate system + with an origin at (0, 0, 0). The Z-axis, is parrallel to the axis of rotation of the earth. The Z-coordinate + is positive toward the North pole. The X-Y plane lies in the equatorial plane. The X-axis lies along the + intersection of the plane containing the prime meridian and the equatorial plane. The X-coordinate is positive + toward the intersection of the prime meridian and equator. + + Please see https://en.wikipedia.org/wiki/Geographic_coordinate_conversion#From_geodetic_to_ECEF_coordinates + for more information about this coordinate transformation. + */ + double ptX = cosLat * Math.cos(longRad); + double ptY = cosLat * Math.sin(longRad); + double ptZ = Math.sin(latRad); + double weight = weightedPoint.getValue().doubleValue(); + x += ptX*weight; + y += ptY*weight; + z += ptZ*weight; + n++; + totalWeight += weight; + } + if(n == 0) { + return null; + } + //average the vector representation in cartesian space, forming the center of gravity in cartesian space + x /= totalWeight; + y /= totalWeight; + z /= totalWeight; + + //convert the cartesian representation back to radians + double longitude = Math.atan2(y, x); + double hypotenuse = Math.sqrt(x*x + y*y); + double latitude = Math.atan2(z, hypotenuse); + + //convert the radians back to degrees latitude and longitude. + return new WGS84Point(Math.toDegrees(latitude), Math.toDegrees(longitude)); + } + + public double maxDistanceHashes(Iterable<String> hashes, DistanceStrategy strategy) { + Iterable<WGS84Point> points = Iterables.transform(hashes, s -> toPoint(s).orElse(null)); + return maxDistancePoints(Iterables.filter(points, p -> p != null), strategy); + } + + public double maxDistancePoints(Iterable<WGS84Point> points, DistanceStrategy strategy) { + //Note: because distance is commutative, we only need search the upper triangle + int i = 0; + double max = Double.NaN; + for(WGS84Point pt1 : points) { + int j = 0; + for(WGS84Point pt2 : points) { + if(j <= i) { + double d = strategy.distance(pt1, pt2); + if(Double.isNaN(max)|| d > max) { + max = d; + } + j++; + } + else { + break; + } + } + i++; + } + return max; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoHashFunctions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoHashFunctions.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoHashFunctions.java new file mode 100644 index 0000000..a1e64c5 --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoHashFunctions.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.stellar; + +import ch.hsr.geohash.WGS84Point; +import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase; +import org.apache.metron.enrichment.adapters.geo.hash.DistanceStrategies; +import org.apache.metron.enrichment.adapters.geo.hash.DistanceStrategy; +import org.apache.metron.enrichment.adapters.geo.hash.GeoHashUtil; +import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.ParseException; +import org.apache.metron.stellar.dsl.Stellar; +import org.apache.metron.stellar.dsl.StellarFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class GeoHashFunctions { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Stellar(name="TO_LATLONG" + ,namespace="GEOHASH" + ,description="Compute the lat/long of a given [geohash](https://en.wikipedia.org/wiki/Geohash)" + ,params = { + "hash - The [geohash](https://en.wikipedia.org/wiki/Geohash)" + } + ,returns = "A map containing the latitude and longitude of the hash (keys \"latitude\" and \"longitude\")" + ) + public static class ToLatLong implements StellarFunction { + + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + if(args.size() < 1) { + return null; + } + String hash = (String)args.get(0); + if(hash == null) { + return null; + } + + Optional<WGS84Point> point = GeoHashUtil.INSTANCE.toPoint(hash); + if(point.isPresent()) { + Map<String, Object> ret = new HashMap<>(); + ret.put(GeoLiteDatabase.GeoProps.LONGITUDE.getSimpleName(), point.get().getLongitude()); + ret.put(GeoLiteDatabase.GeoProps.LATITUDE.getSimpleName(), point.get().getLatitude()); + return ret; + } + return null; + } + + @Override + public void initialize(Context context) { + + } + + @Override + public boolean isInitialized() { + return true; + } + } + + @Stellar(name="FROM_LATLONG" + ,namespace="GEOHASH" + ,description="Compute [geohash](https://en.wikipedia.org/wiki/Geohash) given a lat/long" + ,params = { + "latitude - The latitude", + "longitude - The longitude", + "character_precision? - The number of characters to use in the hash. Default is 12" + } + ,returns = "A [geohash](https://en.wikipedia.org/wiki/Geohash) of the lat/long" + ) + public static class FromLatLong implements StellarFunction { + + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + if(args.size() < 2) { + return null; + } + Object latObj = args.get(0); + Object longObj = args.get(1); + if(latObj == null || longObj == null) { + return null; + } + Double latitude = ConversionUtils.convert(latObj, Double.class); + Double longitude = ConversionUtils.convert(longObj, Double.class); + int charPrecision = 12; + if(args.size() > 2) { + charPrecision = ConversionUtils.convert(args.get(2), Integer.class); + } + Optional<String> ret = GeoHashUtil.INSTANCE.computeHash(latitude, longitude, charPrecision); + return ret.orElse(null); + } + + @Override + public void initialize(Context context) { + + } + + @Override + public boolean isInitialized() { + return true; + } + } + + @Stellar(name="FROM_LOC" + ,namespace="GEOHASH" + ,description="Compute [geohash](https://en.wikipedia.org/wiki/Geohash) given a geo enrichment location" + ,params = { + "map - the latitude and logitude in a map (the output of GEO_GET)", + "character_precision? - The number of characters to use in the hash. Default is 12" + } + ,returns = "A [geohash](https://en.wikipedia.org/wiki/Geohash) of the location" + ) + public static class FromLoc implements StellarFunction { + + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + if(args.size() < 1) { + return null; + } + Map<String, String> map = (Map<String, String>) args.get(0); + if(map == null) { + return null; + } + int charPrecision = 12; + if(args.size() > 1) { + charPrecision = ConversionUtils.convert(args.get(1), Integer.class); + } + Optional<String> ret = GeoHashUtil.INSTANCE.computeHash(map, charPrecision); + return ret.orElse(null); + } + + @Override + public void initialize(Context context) { + + } + + @Override + public boolean isInitialized() { + return true; + } + } + + + @Stellar(name="DIST" + ,namespace="GEOHASH" + ,description="Compute the distance between [geohashes](https://en.wikipedia.org/wiki/Geohash)" + ,params = { + "hash1 - The first location as a geohash", + "hash2 - The second location as a geohash", + "strategy? - The great circle distance strategy to use. One of [HAVERSINE](https://en.wikipedia.org/wiki/Haversine_formula), [LAW_OF_COSINES](https://en.wikipedia.org/wiki/Law_of_cosines#Using_the_distance_formula), or [VICENTY](https://en.wikipedia.org/wiki/Vincenty%27s_formulae). Haversine is default." + } + ,returns = "The distance in kilometers between the hashes" + ) + public static class Dist implements StellarFunction { + + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + if(args.size() < 2) { + return null; + } + String hash1 = (String)args.get(0); + if(hash1 == null) { + return null; + } + Optional<WGS84Point> pt1 = GeoHashUtil.INSTANCE.toPoint(hash1); + String hash2 = (String)args.get(1); + if(hash2 == null) { + return null; + } + Optional<WGS84Point> pt2 = GeoHashUtil.INSTANCE.toPoint(hash2); + DistanceStrategy strat = DistanceStrategies.HAVERSINE; + if(args.size() > 2) { + strat = DistanceStrategies.valueOf((String) args.get(2)); + } + if(pt1.isPresent() && pt2.isPresent()) { + return GeoHashUtil.INSTANCE.distance(pt1.get(), pt2.get(), strat); + } + return Double.NaN; + } + + @Override + public void initialize(Context context) { + + } + + @Override + public boolean isInitialized() { + return true; + } + } + + @Stellar(name="MAX_DIST" + ,namespace="GEOHASH" + ,description="Compute the maximum distance among a list of [geohashes](https://en.wikipedia.org/wiki/Geohash)" + ,params = { + "hashes - A collection of [geohashes](https://en.wikipedia.org/wiki/Geohash)", + "strategy? - The great circle distance strategy to use. One of [HAVERSINE](https://en.wikipedia.org/wiki/Haversine_formula), [LAW_OF_COSINES](https://en.wikipedia.org/wiki/Law_of_cosines#Using_the_distance_formula), or [VICENTY](https://en.wikipedia.org/wiki/Vincenty%27s_formulae). Haversine is default." + } + ,returns = "The maximum distance in kilometers between any two locations" + ) + public static class MaxDist implements StellarFunction { + + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + if(args.size() < 1) { + return null; + } + Iterable<String> hashes = (Iterable<String>)args.get(0); + if(hashes == null) { + return null; + } + DistanceStrategy strat = DistanceStrategies.HAVERSINE; + if(args.size() > 1) { + strat = DistanceStrategies.valueOf((String) args.get(1)); + } + return GeoHashUtil.INSTANCE.maxDistanceHashes(hashes, strat); + } + + @Override + public void initialize(Context context) { + + } + + @Override + public boolean isInitialized() { + return true; + } + } + + @Stellar(name="CENTROID" + ,namespace="GEOHASH" + ,description="Compute the centroid (geographic midpoint or center of gravity) of a set of [geohashes](https://en.wikipedia.org/wiki/Geohash)" + ,params = { + "hashes - A collection of [geohashes](https://en.wikipedia.org/wiki/Geohash) or a map associating geohashes to numeric weights" + ,"character_precision? - The number of characters to use in the hash. Default is 12" + } + ,returns = "The geohash of the centroid" + ) + public static class Centroid implements StellarFunction { + + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + if(args.size() < 1) { + return null; + } + Object o1 = args.get(0); + if(o1 == null) { + return null; + } + WGS84Point centroid = null; + if(o1 instanceof Map) { + centroid = GeoHashUtil.INSTANCE.centroidOfWeightedPoints((Map<String, Number>)o1); + } + else if(o1 instanceof Iterable) { + centroid = GeoHashUtil.INSTANCE.centroidOfHashes((Iterable<String>)o1); + } + if(centroid == null) { + return null; + } + Integer precision = 12; + if(args.size() > 1) { + precision = (Integer)args.get(1); + } + return GeoHashUtil.INSTANCE.computeHash(centroid, precision).orElse(null); + } + + @Override + public void initialize(Context context) { + + } + + @Override + public boolean isInitialized() { + return true; + } + } +}