This is an automated email from the ASF dual-hosted git repository. nickallen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push: new 8686f52 METRON-2284 Metron Profiler for Spark Doesn't Work as Expected (nickwallen) closes apache/metron#1556 8686f52 is described below commit 8686f5265e68a5fb861f944b4b3b086c807618f2 Author: nickwallen <nickal...@apache.org> AuthorDate: Tue Nov 12 14:56:35 2019 -0500 METRON-2284 Metron Profiler for Spark Doesn't Work as Expected (nickwallen) closes apache/metron#1556 --- .../metron/profiler/spark/BatchProfiler.java | 4 +-- .../spark/BatchProfilerIntegrationTest.java | 29 +++++++++++++++++++++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java index 571545e..43b42be 100644 --- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java @@ -87,13 +87,13 @@ public class BatchProfiler implements Serializable { // find all routes for each message Dataset<MessageRoute> routes = telemetry - .flatMap(messageRouterFunction(profilerProps, profiles, globals), Encoders.bean(MessageRoute.class)); + .flatMap(messageRouterFunction(profilerProps, profiles, globals), Encoders.kryo(MessageRoute.class)); LOG.debug("Generated {} message route(s)", routes.cache().count()); // build the profiles Dataset<ProfileMeasurementAdapter> measurements = routes .groupByKey(new GroupByPeriodFunction(profilerProps), Encoders.STRING()) - .mapGroups(new ProfileBuilderFunction(profilerProps, globals), Encoders.bean(ProfileMeasurementAdapter.class)); + .mapGroups(new ProfileBuilderFunction(profilerProps, globals), Encoders.kryo(ProfileMeasurementAdapter.class)); LOG.debug("Produced {} profile measurement(s)", measurements.cache().count()); // write the profile measurements to HBase diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java index b36cf8c..72fd283 100644 --- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java +++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java @@ -19,19 +19,24 @@ */ package org.apache.metron.profiler.spark; +import com.google.common.collect.Maps; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.hbase.mock.MockHBaseTableProvider; +import org.apache.metron.profiler.MessageRoute; import org.apache.metron.profiler.client.stellar.FixedLookback; import org.apache.metron.profiler.client.stellar.GetProfile; import org.apache.metron.profiler.client.stellar.WindowLookback; +import org.apache.metron.profiler.spark.function.MessageRouterFunction; import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor; import org.apache.metron.stellar.common.StellarStatefulExecutor; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; +import org.json.simple.JSONObject; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -41,7 +46,9 @@ import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.lang.invoke.MethodHandles; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -62,6 +69,7 @@ import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INP import static org.apache.metron.profiler.spark.reader.TelemetryReaders.JSON; import static org.apache.metron.profiler.spark.reader.TelemetryReaders.ORC; import static org.apache.metron.profiler.spark.reader.TelemetryReaders.PARQUET; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** @@ -145,6 +153,14 @@ public class BatchProfilerIntegrationTest { * "init": { "count": 0 }, * "update": { "count": "count + 1" }, * "result": "count" + * }, + * { + * "profile": "response-body-len", + * "onlyif": "exists(response_body_len)", + * "foreach": "ip_src_addr", + * "init": { "len": 0 }, + * "update": { "len": "len + response_body_len" }, + * "result": "TO_INTEGER(len)" * } * ] * } @@ -345,7 +361,15 @@ public class BatchProfilerIntegrationTest { * "init": { "count": "STATS_INIT()" }, * "update": { "count": "STATS_ADD(count, 1)" }, * "result": "TO_INTEGER(STATS_COUNT(count))" - * } + * }, + * { + * "profile": "response-body-len", + * "onlyif": "exists(response_body_len)", + * "foreach": "ip_src_addr", + * "init": { "len": "STATS_INIT()" }, + * "update": { "len": "STATS_ADD(len, response_body_len)" }, + * "result": "TO_INTEGER(STATS_SUM(len))" + * } * ] * } */ @@ -386,6 +410,9 @@ public class BatchProfilerIntegrationTest { // there are 100 messages in all assertTrue(execute("[100] == PROFILE_GET('total-count', 'total', window)", Boolean.class)); + + // check the sum of the `response_body_len` field + assertTrue(execute("[1029726] == PROFILE_GET('response-body-len', '192.168.138.158', window)", Boolean.class)); } private Properties getGlobals() {