METRON-1365: Allow PROFILE_GET to return a default value for a profile and 
entity that does not have a value written. closes apache/incubator-metron#871


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/76bed5d7
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/76bed5d7
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/76bed5d7

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: 76bed5d754fcf358809f0be7a034758b9b20fc5e
Parents: 196da12
Author: cstella <ceste...@gmail.com>
Authored: Thu Dec 21 16:49:31 2017 -0500
Committer: cstella <ceste...@gmail.com>
Committed: Thu Dec 21 16:49:31 2017 -0500

----------------------------------------------------------------------
 .../metron-profiler-client/README.md            |  1 +
 .../profiler/client/HBaseProfilerClient.java    | 35 +++++++++++------
 .../metron/profiler/client/ProfilerClient.java  | 10 +++--
 .../profiler/client/stellar/GetProfile.java     |  8 ++--
 .../client/stellar/ProfilerClientConfig.java    |  6 ++-
 .../client/HBaseProfilerClientTest.java         | 40 +++++++++++++-------
 .../profiler/client/stellar/GetProfileTest.java | 38 +++++++++++++++++--
 7 files changed, 102 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/76bed5d7/metron-analytics/metron-profiler-client/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/README.md 
b/metron-analytics/metron-profiler-client/README.md
index a15ccf8..4156058 100644
--- a/metron-analytics/metron-profiler-client/README.md
+++ b/metron-analytics/metron-profiler-client/README.md
@@ -58,6 +58,7 @@ want to change the global Client configuration so as not to 
disrupt the work of
 | profiler.client.hbase.table           | The name of the HBase table used to 
store profile data.                                                             
               | Optional | profiler |
 | profiler.client.hbase.column.family   | The name of the HBase column family 
used to store profile data.                                                     
               | Optional | P        |
 | profiler.client.salt.divisor          | The salt divisor used to store 
profile data.                                                                   
                    | Optional | 1000     |
+| profiler.default.value                | The default value to be returned if 
a profile is not written for a given period for a profile and entity.           
               | Optional | null     |
 | hbase.provider.impl                   | The name of the HBaseTableProvider 
implementation class.                                                           
                | Optional |          |
 
 ### Profile Selectors

http://git-wip-us.apache.org/repos/asf/metron/blob/76bed5d7/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
index 7c4ec84..de2d42c 100644
--- 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
+++ 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -71,14 +72,15 @@ public class HBaseProfilerClient implements ProfilerClient {
    * @param groups      The groups used to sort the profile data.
    * @param durationAgo How far in the past to fetch values from.
    * @param unit        The time unit of 'durationAgo'.
+   * @param defaultValue The default value to specify.  If empty, the result 
will be sparse.
    * @param <T>         The type of values stored by the Profile.
    * @return A list of values.
    */
   @Override
-  public <T> List<T> fetch(Class<T> clazz, String profile, String entity, 
List<Object> groups, long durationAgo, TimeUnit unit) {
+  public <T> List<T> fetch(Class<T> clazz, String profile, String entity, 
List<Object> groups, long durationAgo, TimeUnit unit, Optional<T> defaultValue) 
{
     long end = System.currentTimeMillis();
     long start = end - unit.toMillis(durationAgo);
-    return fetch(clazz, profile, entity, groups, start, end);
+    return fetch(clazz, profile, entity, groups, start, end, defaultValue);
   }
 
   /**
@@ -90,11 +92,12 @@ public class HBaseProfilerClient implements ProfilerClient {
    * @param groups  The groups used to sort the profile data.
    * @param start   The start time in epoch milliseconds.
    * @param end     The end time in epoch milliseconds.
+   * @param defaultValue The default value to specify.  If empty, the result 
will be sparse.
    * @param <T>     The type of values stored by the profile.
    * @return A list of values.
    */
   @Override
-  public <T> List<T> fetch(Class<T> clazz, String profile, String entity, 
List<Object> groups, long start, long end) {
+  public <T> List<T> fetch(Class<T> clazz, String profile, String entity, 
List<Object> groups, long start, long end, Optional<T> defaultValue) {
     byte[] columnFamily = Bytes.toBytes(columnBuilder.getColumnFamily());
     byte[] columnQualifier = columnBuilder.getColumnQualifier("value");
 
@@ -108,7 +111,7 @@ public class HBaseProfilerClient implements ProfilerClient {
             .collect(Collectors.toList());
 
     // get the 'gets'
-    return get(gets, columnQualifier, columnFamily, clazz);
+    return get(gets, columnQualifier, columnFamily, clazz, defaultValue);
   }
 
   /**
@@ -119,10 +122,11 @@ public class HBaseProfilerClient implements 
ProfilerClient {
    * @param entity     The name of the entity.
    * @param groups     The groups used to sort the profile data.
    * @param periods    The set of profile measurement periods
+   * @param defaultValue The default value to specify.  If empty, the result 
will be sparse.
    * @return A list of values.
    */
   @Override
-  public <T> List<T> fetch(Class<T> clazz, String profile, String entity, 
List<Object> groups, Iterable<ProfilePeriod> periods) {
+  public <T> List<T> fetch(Class<T> clazz, String profile, String entity, 
List<Object> groups, Iterable<ProfilePeriod> periods, Optional<T> defaultValue) 
{
     byte[] columnFamily = Bytes.toBytes(columnBuilder.getColumnFamily());
     byte[] columnQualifier = columnBuilder.getColumnQualifier("value");
 
@@ -136,7 +140,7 @@ public class HBaseProfilerClient implements ProfilerClient {
             .collect(Collectors.toList());
 
     // get the 'gets'
-    return get(gets, columnQualifier, columnFamily, clazz);
+    return get(gets, columnQualifier, columnFamily, clazz, defaultValue);
   }
 
   /**
@@ -146,19 +150,26 @@ public class HBaseProfilerClient implements 
ProfilerClient {
    * @param columnQualifier The column qualifier.
    * @param columnFamily    The column family.
    * @param clazz           The type expected in return.
+   * @param defaultValue The default value to specify.  If empty, the result 
will be sparse.
    * @param <T>             The type expected in return.
    * @return
    */
-  private <T> List<T> get(List<Get> gets, byte[] columnQualifier, byte[] 
columnFamily, Class<T> clazz) {
+  private <T> List<T> get(List<Get> gets, byte[] columnQualifier, byte[] 
columnFamily, Class<T> clazz, Optional<T> defaultValue) {
     List<T> values = new ArrayList<>();
 
     try {
       Result[] results = table.get(gets);
-      Arrays.stream(results)
-              .filter(r -> r.containsColumn(columnFamily, columnQualifier))
-              .map(r -> r.getValue(columnFamily, columnQualifier))
-              .forEach(val -> values.add(SerDeUtils.fromBytes(val, clazz)));
-
+      for(int i = 0;i < results.length;++i) {
+        Result result = results[i];
+        boolean exists = result.containsColumn(columnFamily, columnQualifier);
+        if(!exists && defaultValue.isPresent()) {
+          values.add(defaultValue.get());
+        }
+        else if(exists) {
+          byte[] val = result.getValue(columnFamily, columnQualifier);
+          values.add(SerDeUtils.fromBytes(val, clazz));
+        }
+      }
     } catch(IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/metron/blob/76bed5d7/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
index 57b0e04..bab4ec9 100644
--- 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
+++ 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
@@ -23,6 +23,7 @@ package org.apache.metron.profiler.client;
 import org.apache.metron.profiler.ProfilePeriod;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -39,10 +40,11 @@ public interface ProfilerClient {
    * @param groups      The groups used to sort the profile data.
    * @param durationAgo How far in the past to fetch values from.
    * @param unit        The time unit of 'durationAgo'.
+   * @param defaultValue The default value to specify.  If empty, the result 
will be sparse.
    * @param <T>         The type of values stored by the Profile.
    * @return A list of values.
    */
-  <T> List<T> fetch(Class<T> clazz, String profile, String entity, 
List<Object> groups, long durationAgo, TimeUnit unit);
+  <T> List<T> fetch(Class<T> clazz, String profile, String entity, 
List<Object> groups, long durationAgo, TimeUnit unit, Optional<T> defaultValue);
 
   /**
    * Fetch the values stored in a profile based on a start and end timestamp.
@@ -53,10 +55,11 @@ public interface ProfilerClient {
    * @param groups  The groups used to sort the profile data.
    * @param start   The start time in epoch milliseconds.
    * @param end     The end time in epoch milliseconds.
+   * @param defaultValue The default value to specify.  If empty, the result 
will be sparse.
    * @param <T>     The type of values stored by the profile.
    * @return A list of values.
    */
-  <T> List<T> fetch(Class<T> clazz, String profile, String entity, 
List<Object> groups, long start, long end);
+  <T> List<T> fetch(Class<T> clazz, String profile, String entity, 
List<Object> groups, long start, long end, Optional<T> defaultValue);
 
   /**
    * Fetch the values stored in a profile based on a set of period keys.
@@ -66,8 +69,9 @@ public interface ProfilerClient {
    * @param entity  The name of the entity.
    * @param groups  The groups used to sort the profile data.
    * @param periods The set of profile period keys
+   * @param defaultValue The default value to specify.  If empty, the result 
will be sparse.
    * @param <T>     The type of values stored by the profile.
    * @return A list of values.
    */
-  <T> List<T> fetch(Class<T> clazz, String profile, String entity, 
List<Object> groups, Iterable<ProfilePeriod> periods);
+  <T> List<T> fetch(Class<T> clazz, String profile, String entity, 
List<Object> groups, Iterable<ProfilePeriod> periods, Optional<T> defaultValue);
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/76bed5d7/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
index 802c552..73cd5a1 100644
--- 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
+++ 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
@@ -163,7 +163,7 @@ public class GetProfile implements StellarFunction {
     }
 
     Map<String, Object> effectiveConfig = getEffectiveConfig(context, 
configOverridesMap);
-
+    Object defaultValue = null;
     //lazily create new profiler client if needed
     if (client == null || !cachedConfigMap.equals(effectiveConfig)) {
       RowKeyBuilder rowKeyBuilder = getRowKeyBuilder(effectiveConfig);
@@ -172,8 +172,10 @@ public class GetProfile implements StellarFunction {
       client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder);
       cachedConfigMap = effectiveConfig;
     }
-
-    return client.fetch(Object.class, profile, entity, groups, 
periods.orElse(new ArrayList<>(0)));
+    if(cachedConfigMap != null) {
+      defaultValue = 
ProfilerClientConfig.PROFILER_DEFAULT_VALUE.get(cachedConfigMap);
+    }
+    return client.fetch(Object.class, profile, entity, groups, 
periods.orElse(new ArrayList<>(0)), Optional.ofNullable(defaultValue));
   }
 
 

http://git-wip-us.apache.org/repos/asf/metron/blob/76bed5d7/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
index 351b807..9bbc29d 100644
--- 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
+++ 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
@@ -56,8 +56,12 @@ public enum ProfilerClientConfig {
   /**
    * A global property that defines the salt divisor used to store profile 
data.
    */
-  PROFILER_SALT_DIVISOR("profiler.client.salt.divisor", 1000L, Long.class);
+  PROFILER_SALT_DIVISOR("profiler.client.salt.divisor", 1000L, Long.class),
 
+  /**
+   * The default value to be returned if a profile is not written for a given 
period for a profile and entity.
+   */
+  PROFILER_DEFAULT_VALUE("profiler.default.value", null, Object.class);
   String key;
   Object defaultValue;
   Class<?> valueType;

http://git-wip-us.apache.org/repos/asf/metron/blob/76bed5d7/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
 
b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
index 8519f10..96c0d91 100644
--- 
a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
+++ 
b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
@@ -34,9 +34,11 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests the HBaseProfilerClient.
@@ -99,12 +101,15 @@ public class HBaseProfilerClientTest {
     profileWriter.write(m, count, Arrays.asList("weekdays"), val -> 
expectedValue);
     profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
 
-    // execute
-    List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", 
Arrays.asList("weekdays"), hours, TimeUnit.HOURS);
+    //valid results
+    {
+      // execute
+      List<Integer> results = client.fetch(Integer.class, "profile1", 
"entity1", Arrays.asList("weekdays"), hours, TimeUnit.HOURS, Optional.empty());
 
-    // validate
-    assertEquals(count, results.size());
-    results.forEach(actual -> assertEquals(expectedValue, (int) actual));
+      // validate
+      assertEquals(count, results.size());
+      results.forEach(actual -> assertEquals(expectedValue, (int) actual));
+    }
   }
 
   /**
@@ -128,10 +133,19 @@ public class HBaseProfilerClientTest {
 
     // execute
     List<Object> doesNotExist = Arrays.asList("does-not-exist");
-    List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", 
doesNotExist, hours, TimeUnit.HOURS);
-
-    // validate
-    assertEquals(0, results.size());
+    {
+      List<Integer> results = client.fetch(Integer.class, "profile1", 
"entity1", doesNotExist, hours, TimeUnit.HOURS, Optional.empty());
+
+      // validate
+      assertEquals(0, results.size());
+    }
+    {
+      //with a default value, we'd expect a bunch of 0's
+      List<Integer> results = client.fetch(Integer.class, "profile1", 
"entity1", doesNotExist, hours, TimeUnit.HOURS, Optional.of(0));
+      //8 or 9 15 minute periods in 2 hours depending on when you start
+      assertTrue(results.size() == 8 || results.size() == 9);
+      results.forEach(actual -> assertEquals(0, (int) actual));
+    }
   }
 
   /**
@@ -152,7 +166,7 @@ public class HBaseProfilerClientTest {
     profileWriter.write(m, hours * periodsPerHour, group, val -> 1000);
 
     // execute
-    List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", 
group, 2, TimeUnit.MILLISECONDS);
+    List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", 
group, 2, TimeUnit.MILLISECONDS, Optional.empty());
 
     // validate - there should NOT be any results from just 2 milliseconds ago
     assertEquals(0, results.size());
@@ -179,7 +193,7 @@ public class HBaseProfilerClientTest {
     profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
 
     // execute
-    List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", 
Arrays.asList("weekdays"), startTime, endTime);
+    List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", 
Arrays.asList("weekdays"), startTime, endTime, Optional.empty());
 
     // validate
     assertEquals(count, results.size());
@@ -210,7 +224,7 @@ public class HBaseProfilerClientTest {
 
     // execute
     List<Object> doesNotExist = Arrays.asList("does-not-exist");
-    List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", 
doesNotExist, startTime, endTime);
+    List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", 
doesNotExist, startTime, endTime, Optional.empty());
 
     // validate
     assertEquals(0, results.size());
@@ -238,7 +252,7 @@ public class HBaseProfilerClientTest {
     // execute
     final long endFetchAt = System.currentTimeMillis();
     final long startFetchAt = endFetchAt - TimeUnit.MILLISECONDS.toMillis(30);
-    List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", 
group, startFetchAt, endFetchAt);
+    List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", 
group, startFetchAt, endFetchAt, Optional.empty());
 
     // validate - there should NOT be any results from just 2 milliseconds ago
     assertEquals(0, results.size());

http://git-wip-us.apache.org/repos/asf/metron/blob/76bed5d7/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
 
b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
index 307b548..6ed6e64 100644
--- 
a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
+++ 
b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
@@ -39,12 +39,13 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.Map;
 import java.util.HashMap;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.*;
 
@@ -311,6 +312,35 @@ public class GetProfileTest {
   }
 
   /**
+   * Default value should be able to be specified
+   */
+  @Test
+  public void testWithDefaultValue() {
+    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 
'HOURS'))";
+    @SuppressWarnings("unchecked")
+    List<Integer> result = run(expr, List.class);
+
+    // validate - expect to fail to read any values because we didn't write 
any.
+    Assert.assertEquals(0, result.size());
+
+    // execute - read the profile values - with config_override.
+    // first two override values are strings, third is deliberately a number.
+    testOverride("{'profiler.default.value' : 0}", 0);
+    testOverride("{'profiler.default.value' : 'metron'}", "metron");
+    testOverride("{'profiler.default.value' : []}", new ArrayList<>());
+  }
+
+  private void testOverride(String overrides, Object defaultVal) {
+      String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 
'HOURS'), [], " + overrides + ")";
+      List<Object> result = run(expr, List.class);
+
+      // validate - expect to read all values from the past 4 hours (16 or 17 
values depending on start time)
+      // but they should all be the default value.
+      Assert.assertTrue(result.size() == 16 || result.size() == 17);
+      result.forEach(actual -> Assert.assertEquals(defaultVal, actual));
+  }
+
+  /**
    * Values should be retrievable that were written with configuration 
different than current global config.
    */
   @Test

Reply via email to