[ 
https://issues.apache.org/jira/browse/METRON-1005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099596#comment-16099596
 ] 

ASF GitHub Bot commented on METRON-1005:
----------------------------------------

Github user mattf-horton commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129226069
  
    --- Diff: 
metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java
 ---
    @@ -0,0 +1,402 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +import static 
org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD;
    +import static 
org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
    +import static 
org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in 
HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one 
that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously 
generated row key this builder
    + * can extract the profile name, entity name, group name(s), period 
duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For 
example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first 
period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a 
practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans 
with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key 
and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  
The salt
    +   * divisor is used to generate the salt.  The salt divisor should be 
roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(PROFILER_SALT_DIVISOR.getDefault(Integer.class),
    +            PROFILER_PERIOD.getDefault(Long.class),
    +            
TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class)));
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit 
units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements 
over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile 
measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> 
groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical 
to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's 
measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements 
stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the 
rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile 
measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> 
groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), 
m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, 
ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid 
profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid 
entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid 
profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + 
entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    +            .putLong(periodId)
    +            .putLong(periodDurationMillis);
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Decodes a row key to build a ProfileMeasurement containing all of the
    +   * relevant fields that exist within the row key.
    +   *
    +   * @param rowKey The row key to decode.
    +   * @return A ProfileMeasurement.
    +   */
    +  @Override
    +  public ProfileMeasurement decode(byte[] rowKey) {
    +    ByteBuffer buffer = ByteBuffer
    +            .wrap(rowKey)
    +            .order(byteOrder);
    +
    +    try {
    +      // validate the magic number
    +      short magicNumber = buffer.getShort();
    +      if(magicNumber != MAGIC_NUMBER) {
    +        throw new IllegalArgumentException(String.format("Invalid magic 
number; expected '%s', got '%s'", MAGIC_NUMBER, magicNumber));
    +      }
    +
    +      // validate the row key version
    +      byte version = buffer.get();
    +      if(version != VERSION) {
    +        throw new IllegalArgumentException(String.format("Invalid version; 
expected '%s', got '%s'", VERSION, version));
    +      }
    +
    +      // validate the salt length
    +      int saltLength = buffer.getInt();
    +      if (saltLength <= 0 || saltLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid salt 
length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, saltLength));
    +      }
    +
    +      // decode the salt
    +      byte[] salt = new byte[saltLength];
    +      buffer.get(salt);
    +
    +      // validate the profile length
    +      int profileLength = buffer.getInt();
    +      if (profileLength <= 0 || profileLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid profile 
length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, profileLength));
    +      }
    +
    +      // decode the profile name
    +      byte[] profileBytes = new byte[profileLength];
    +      buffer.get(profileBytes);
    +      String profile = new String(profileBytes);
    +
    +      // validate the entity length
    +      int entityLength = buffer.getInt();
    +      if (entityLength <= 0 || entityLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid entity 
length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, entityLength));
    +      }
    +
    +      // decode the entity
    +      byte[] entityBytes = new byte[entityLength];
    +      buffer.get(entityBytes);
    +      String entity = new String(entityBytes);
    +
    +      // decode the groups
    +      List<Object> groups = new ArrayList<>();
    +      int numberOfGroups = buffer.getInt();
    +      for (int i = 0; i < numberOfGroups; i++) {
    +
    +        // validate the group length
    +        int groupLength = buffer.getInt();
    +        if (groupLength <= 0 || groupLength > MAX_FIELD_LENGTH) {
    +          throw new IllegalArgumentException(String.format("Invalid group 
length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, groupLength));
    +        }
    +
    +        // decode the group
    +        byte[] groupBytes = new byte[groupLength];
    +        buffer.get(groupBytes);
    +
    +        String group = new String(groupBytes);
    +        groups.add(group);
    +      }
    +
    +      // decode the period
    +      long periodId = buffer.getLong();
    +      long duration = buffer.getLong();
    +      ProfilePeriod period = ProfilePeriod.buildFromPeriod(periodId, 
duration, TimeUnit.MILLISECONDS);
    +
    +      return new ProfileMeasurement()
    +              .withProfileName(profile)
    +              .withEntity(entity)
    +              .withGroups(groups)
    +              .withProfilePeriod(period);
    +
    +    } catch(BufferUnderflowException e) {
    +      throw new IllegalArgumentException("Unable to decode the row key", 
e);
    +    }
    +
    +  }
    +
    +  /**
    +   * Builds the 'group' component of the row key.
    +   * @param groups The groups to include in the row key.
    +   */
    +  private byte[] encodeGroups(List<Object> groups) {
    +
    +    // encode each of the groups and determine their size
    +    int lengthOfGroups = 0;
    +    List<byte[]> groupBytes = new ArrayList<>();
    +    for(Object group : groups) {
    +      byte[] groupB = Bytes.toBytes(String.valueOf(group));
    +      groupBytes.add(groupB);
    +      lengthOfGroups += groupB.length;
    +    }
    +
    +    // encode each of the groups
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate((Integer.BYTES * (1 + groups.size())) + 
lengthOfGroups)
    +            .order(byteOrder)
    +            .putInt(groups.size());
    +
    +    for(byte[] groupB : groupBytes) {
    +      buffer.putInt(groupB.length).put(groupB);
    +    }
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period The ProfilePeriod in which the ProfileMeasurement was 
taken.
    +   */
    +  private static byte[] encodePeriod(ProfilePeriod period) {
    +    return encodePeriod(period.getPeriod());
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period the period
    +   */
    +  private static byte[] encodePeriod(long period) {
    +    return Bytes.toBytes(period);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable 
value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period in which a profile measurement is taken.
    +   */
    +  public static byte[] encodeSalt(ProfilePeriod period, int saltDivisor) {
    +    return encodeSalt(period.getPeriod(), saltDivisor);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable 
value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period
    +   * @param saltDivisor The salt divisor
    +   */
    +  public static byte[] encodeSalt(long period, int saltDivisor) {
    +    try {
    +      // an MD5 is 16 bytes aka 128 bits
    +      MessageDigest digest = MessageDigest.getInstance("MD5");
    +      byte[] hash = digest.digest(encodePeriod(period));
    +      int salt = Bytes.toShort(hash) % saltDivisor;
    +      return Bytes.toBytes(salt);
    --- End diff --
    
    \@ All, please note that my overall review comment (as opposed to these 
detailed comments) weren't published to email by github.  Please see it at 
https://github.com/apache/metron/pull/622#pullrequestreview-51932717 
    It does address the issue of changed profile specs being different profiles.
    
    @simonellistonball , your question sort of brings up the issue of why we 
use salts at all.  Apparently the leading bytes are important; if they weren't 
there would be no reason to use a salt, but we could just trust in the later 
bytes to sufficiently differentiate the partition/region for the row.  As I 
understand hbase table partitioning (imperfectly), the more times partitioning 
has happened, and the broader, more well-mixed set of rowkeys are in the table, 
then the more likely that rowkeys will end up in the same partition (region) 
unless they differ early in the byte array.  So, if the hbase instance mostly 
only has data with similar rowkeys, then the bytes later in the rowkey, such as 
the profile name, will serve sufficiently.  But as the table fills up with a 
broader range of rowkeys, as it will with our salting strategy, and as it grows 
larger over time, then a bunch of inserts with the same salt (leading few 
bytes) is more likely to land in the same partition.
    
    I'm not an hbase expert, so if you feel I've got this wrong, please clarify.


> Create Decodable Row Key for Profiler
> -------------------------------------
>
>                 Key: METRON-1005
>                 URL: https://issues.apache.org/jira/browse/METRON-1005
>             Project: Metron
>          Issue Type: Improvement
>    Affects Versions: 0.3.0
>            Reporter: Nick Allen
>            Assignee: Nick Allen
>             Fix For: Next + 1
>
>
> To be able to answer the types of questions that I outlined in METRON-450, we 
> need a row key that is decodable.  Right now there is no logic to decode a 
> row key, nor is the existing row key easily decodable.  
> Once the row keys can be decoded, you could scan all of the row keys in the 
> Profiler's HBase table, decode each of them and extract things like, the 
> names of all your profiles, the names of entities within a profile, the 
> period duration of a given profile.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to