GitHub user nickwallen opened a pull request: https://github.com/apache/metron/pull/1025
METRON-1533 Create KAFKA_FIND Stellar function This PR is built on #1024 and #1023. Dig into the last commit to review the changes for this PR alone. ### Changes I created a `KAFKA_FIND` function that allows you to provide a filter expression so that only messages satisfying a condition are returned. For example... - Find a message that has been enriched with geolocation data. ``` KAFKA_FIND('indexing', m -> MAP_EXISTS('geo.city', m)) ``` - Find a Bro message. ``` KAFKA_FIND('indexing', m -> MAP_GET('source.type', m) == 'bro') ``` The message is presented to the filter lambda expression as a map of field values. This makes creating the filter expression a bit simpler. Like the other `KAFKA_*` functions, this is not intended to be highly performant. This is only intended to make the process of creating and modifying enrichments simpler in the REPL. See the **Use Case** section for more details on how I see this being used. ### Future If we were in the future to provide map literals in Stellar, this would become a fair bit simpler. ``` KAFKA_FIND('indexing', m -> m['source.type'] == 'bro') ``` ### Use Case When creating enrichments, I often find that I want to validate that the enrichment I just created was successful on the live, incoming stream of telemetry. My workflow looks something like this. 1. Create and test the enrichment that I want to create. ``` [Stellar]>>> ip_src_addr := "72.34.49.86" 72.34.49.86 [Stellar]>>> geo := GEO_GET(ip_src_addr) {country=US, dmaCode=803, city=Los Angeles, postalCode=90014, latitude=34.0438, location_point=34.0438,-118.2512, locID=5368361, longitude=-118.2512} ``` 2. That looks good to me. Now let's add that to my Bro telemetry. ``` [Stellar]>>> conf := SHELL_EDIT(conf) { "enrichment" : { "fieldMap": { "stellar": { "config": [ "geo := GEO_GET(ip_src_addr)" ] } } }, "threatIntel": { } } [Stellar]>>> CONFIG_PUT("ENRICHMENTS", e, "bro") ``` 3. It looks like that worked, but did that really work? At this point, I would run `KAFKA_GET` as many times as it takes to retrieve a Bro message. You would just have to get lucky and hope that the enrichment worked and secondly that you would pull down a Bro message (as opposed to a different sensor). I would rather have a function that lets me only pull back the messages that I care about. In this case I could either retrieve only Bro messages. ``` KAFKA_FIND('indexing', m -> MAP_GET('source.type', m) == 'bro') ``` Or I could look for messages that contain geolocation data. ``` KAFKA_FIND('indexing', m -> MAP_EXISTS('geo.city', m)) ``` ### Changes * Created the `KAFKA_FIND` function along with unit tests. * Updated all KAFKA_* functions to use a standard `getArg` function so that argument handling is all done the same way. ### Pull Request Checklist - [x] Is there a JIRA ticket associated with this PR? If not one needs to be created at [Metron Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel). - [x] Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Have you included steps to reproduce the behavior or problem that is being changed or addressed? - [x] Have you included steps or a guide to how the change may be verified and tested manually? - [x] Have you ensured that the full suite of tests and checks have been executed in the root metron folder via: - [x] Have you written or updated unit tests and or integration tests to verify your changes? - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [x] Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent? You can merge this pull request into a Git repository by running: $ git pull https://github.com/nickwallen/metron METRON-1533-NEW Alternatively you can review and apply these changes as the patch at: https://github.com/apache/metron/pull/1025.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1025 ---- commit 10fe2bfd04db6633b73e9d0f37d458f4ce83aaf9 Author: Nick Allen <nick@...> Date: 2018-04-20T20:18:16Z METRON-1571 commit 73f153692d50af9038dbb9d77412c6298b615efb Author: Nick Allen <nick@...> Date: 2018-05-22T23:17:09Z METRON-1572 Enhance KAFKA_PUT function commit 0f06cce8730d2323b65fc114d51ff61ff70b2b8b Author: Nick Allen <nick@...> Date: 2018-05-22T23:43:18Z METRON-1533 Create KAFKA_FIND Stellar function ---- ---