Good evening,
I am working with metron 0.7.2. I have tried to implement a variation on the
solution at
hxxps://metron.apache.org/current-book/use-cases/geographic_login_outliers/index.html,
with the modification that data is coming into a Kafka topic (JSON format)
instead of an import from a CSV.
No data has appeared in my Hbase profiler table. I discovered in the Storm UI
an error in the hbaseBolt for the profiler topology (see below). I tried an
insert at the command prompt, everything seemed okay but my experience with
HBase is still limited. My plan tomorrow is to try and debug things using
Stellar, but perhaps someone has seen this error somewhere and knows what is
wrong?
Thank you,
Tom.
java.lang.Error: Unresolved compilation problem: at
org.apache.metron.common.utils.SerDeUtils.toBytes(SerDeUtils.java:235) at
org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder.columns(ValueOnlyColumnBuilder.java:52)
at
org.apache.metron.profiler.storm.ProfileHBaseMapper.columns(ProfileHBaseMapper.java:82)
at org.apache.metron.hbase.bolt.HBaseBolt.save(HBaseBolt.java:183) at
org.apache.metron.hbase.bolt.HBaseBolt.execute(HBaseBolt.java:164) at
org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
at
org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
at
org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
at
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:451)
at
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
at
org.apache.storm.daemon.executor$fn__10195$fn__10208$fn__10263.invoke(executor.clj:855)
at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) at
clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)
(parser configuration)
{
"parserClassName":
"org.apache.metron.parsers.json.JSONMapParser",
"sensorTopic": "radius",
"outputTopic": "indexing",
"fieldTransformations": [{
"transformation": "STELLAR",
"output": [
"geohash"
],
"config": {
"geohash":
"GEOHASH_FROM_LOC(GEO_GET(client.ip))"
}
}]
}
(enrichment configuration)
{
"enrichment": {
"fieldMap": {
"stellar" : {
"config" : [
"geo_locations := MULTISET_MERGE( PROFILE_GET( 'locations_by_user',
user.id, PROFILE_FIXED( 15, ‘MINUTES’)))",
"geo_centroid := GEOHASH_CENTROID(geo_locations)",
"geo_distance := TO_INTEGER(GEOHASH_DIST(geo_centroid, hash))",
"geo_locations := null"
]
}
}
,"fieldToTypeMap": { }
},
"threatIntel": {
"fieldMap": {
"stellar" : {
"config" : [
"geo_distance_distr:= STATS_MERGE( PROFILE_GET(
'geo_distribution_from_centroid', 'global', PROFILE_FIXED( 15, ‘MINUTES’)))",
"dist_median := STATS_PERCENTILE(geo_distance_distr, 50.0)",
"dist_sd := STATS_SD(geo_distance_distr)",
"geo_outlier := ABS(dist_median - geo_distance) >= 5*dist_sd",
"is_alert := is_alert || (geo_outlier != null && geo_outlier ==
true)",
"geo_distance_distr := null"
]
}
},
"fieldToTypeMap": { },
"triageConfig" : {
"riskLevelRules" : [
{
"name" : "Geographic Outlier",
"comment" : "Determine if the user's geographic distance from the
centroid of the historic logins is an outlier as compared to all users.",
"rule" : "geo_outlier != null && geo_outlier",
"score" : 10,
"reason" : "FORMAT('user %s has a distance (%d) from the centroid of
their last login is 5 std deviations (%f) from the median (%f)', user.id,
geo_distance, dist_sd, dist_median)"
}
],
"aggregator" : "MAX"
}
}
}
(profiler.json)
{
"profiles": [
{
"profile": "geo_distribution_from_centroid",
"foreach": "'global'",
"onlyif": "geo_distance != null",
"init": {
"s": "STATS_INIT()"
},
"update": {
"s": "STATS_ADD(s, geo_distance)"
},
"result": {
"profile":"s"
}
},
{
"profile": "locations_by_user",
"foreach": "user.id",
"onlyif": "geohash != null && LENGTH(geohash) > 0",
"init": {
"s": "MULTISET_INIT()"
},
"update": {
"s": "MULTISET_ADD(s, geohash)"
},
"result": {
"profile":"s"
}
}
]
}
smime.p7s
Description: S/MIME cryptographic signature
