[ https://issues.apache.org/jira/browse/METRON-1005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099756#comment-16099756 ]
ASF GitHub Bot commented on METRON-1005: ---------------------------------------- Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/622#discussion_r129252112 --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java --- @@ -0,0 +1,402 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.hbase; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.metron.profiler.ProfilePeriod; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD; +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS; +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR; + +/** + * Responsible for building the row keys used to store profile data in HBase. + * + * This builder generates decodable row keys. A decodable row key is one that can be interrogated to extract + * the constituent components of that row key. Given a previously generated row key this builder + * can extract the profile name, entity name, group name(s), period duration, and period. + * + * The row key is composed of the following fields. + * <ul> + * <li>magic number - Helps to validate the row key.</li> + * <li>version - The version number of the row key.</li> + * <li>salt - A salt that helps prevent hot-spotting. + * <li>profile - The name of the profile. + * <li>entity - The name of the entity being profiled. + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays. + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically. + * </ul> + */ +public class DecodableRowKeyBuilder implements RowKeyBuilder { + + /** + * Defines the byte order when encoding and decoding the row keys. + * + * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :) + */ + private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; + + /** + * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys. + */ + private static final int MAX_FIELD_LENGTH = 1000; + + /** + * A magic number embedded in each row key to help validate the row key and byte ordering when decoding. + */ + protected static final short MAGIC_NUMBER = 77; + + /** + * The version number of the row keys supported by this builder. + */ + protected static final byte VERSION = (byte) 1; + + /** + * A salt can be prepended to the row key to help prevent hot-spotting. The salt + * divisor is used to generate the salt. The salt divisor should be roughly equal + * to the number of nodes in the Hbase cluster. + */ + private int saltDivisor; + + /** + * The duration of each profile period in milliseconds. + */ + private long periodDurationMillis; + + public DecodableRowKeyBuilder() { + this(PROFILER_SALT_DIVISOR.getDefault(Integer.class), + PROFILER_PERIOD.getDefault(Long.class), + TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class))); + } + + public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) { + this.saltDivisor = saltDivisor; + this.periodDurationMillis = units.toMillis(duration); + } + + /** + * Builds a list of row keys necessary to retrieve profile measurements over + * a time horizon. + * + * @param profile The name of the profile. + * @param entity The name of the entity. + * @param groups The group(s) used to sort the profile data. + * @param start When the time horizon starts in epoch milliseconds. + * @param end When the time horizon ends in epoch milliseconds. + * @return All of the row keys necessary to retrieve the profile measurements. + */ + @Override + public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) { + // be forgiving of out-of-order start and end times; order is critical to this algorithm + end = Math.max(start, end); + start = Math.min(start, end); + + // find the starting period and advance until the end time is reached + return ProfilePeriod.visitPeriods( start + , end + , periodDurationMillis + , TimeUnit.MILLISECONDS + , Optional.empty() + , period -> encode(profile, entity, groups, period) + ); + + } + + /** + * Builds a list of row keys necessary to retrieve a profile's measurements over + * a time horizon. + * <p> + * This method is useful when attempting to read ProfileMeasurements stored in HBase. + * + * @param profile The name of the profile. + * @param entity The name of the entity. + * @param groups The group(s) used to sort the profile data. + * @param periods The profile measurement periods to compute the rowkeys for + * @return All of the row keys necessary to retrieve the profile measurements. + */ + @Override + public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) { + List<byte[]> rowKeys = new ArrayList<>(); + for(ProfilePeriod period : periods) { + rowKeys.add(encode(profile, entity, groups, period)); + } + return rowKeys; + } + + /** + * Builds the row key for a given profile measurement. + * @param m The profile measurement. + * @return The HBase row key. + */ + @Override + public byte[] encode(ProfileMeasurement m) { + return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod()); + } + + /** + * Build the row key. + * @param profile The name of the profile. + * @param entity The name of the entity. + * @param period The period in which the measurement was taken. + * @param groups The groups. + * @return The HBase row key. + */ + public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) { + + if(profile == null) + throw new IllegalArgumentException("Cannot encode row key; invalid profile name."); + if(entity == null) + throw new IllegalArgumentException("Cannot encode row key; invalid entity name."); + if(period == null) + throw new IllegalArgumentException("Cannot encode row key; invalid profile period."); + + long periodId = period.getPeriod(); + long periodDurationMillis = period.getDurationMillis(); + + byte[] salt = encodeSalt(periodId, saltDivisor); + byte[] profileB = Bytes.toBytes(profile); + byte[] entityB = Bytes.toBytes(entity); + byte[] groupB = encodeGroups(groups); + + int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2); + ByteBuffer buffer = ByteBuffer + .allocate(capacity) + .order(byteOrder) + .putShort(MAGIC_NUMBER) + .put(VERSION) + .putInt(salt.length) + .put(salt) + .putInt(profileB.length) + .put(profileB) + .putInt(entityB.length) + .put(entityB) + .put(groupB) + .putLong(periodId) + .putLong(periodDurationMillis); + + return buffer.array(); + } + + /** + * Decodes a row key to build a ProfileMeasurement containing all of the + * relevant fields that exist within the row key. + * + * @param rowKey The row key to decode. + * @return A ProfileMeasurement. + */ + @Override + public ProfileMeasurement decode(byte[] rowKey) { + ByteBuffer buffer = ByteBuffer + .wrap(rowKey) + .order(byteOrder); + + try { + // validate the magic number + short magicNumber = buffer.getShort(); + if(magicNumber != MAGIC_NUMBER) { + throw new IllegalArgumentException(String.format("Invalid magic number; expected '%s', got '%s'", MAGIC_NUMBER, magicNumber)); + } + + // validate the row key version + byte version = buffer.get(); + if(version != VERSION) { + throw new IllegalArgumentException(String.format("Invalid version; expected '%s', got '%s'", VERSION, version)); + } + + // validate the salt length + int saltLength = buffer.getInt(); + if (saltLength <= 0 || saltLength > MAX_FIELD_LENGTH) { + throw new IllegalArgumentException(String.format("Invalid salt length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, saltLength)); + } + + // decode the salt + byte[] salt = new byte[saltLength]; + buffer.get(salt); + + // validate the profile length + int profileLength = buffer.getInt(); + if (profileLength <= 0 || profileLength > MAX_FIELD_LENGTH) { + throw new IllegalArgumentException(String.format("Invalid profile length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, profileLength)); + } + + // decode the profile name + byte[] profileBytes = new byte[profileLength]; + buffer.get(profileBytes); + String profile = new String(profileBytes); + + // validate the entity length + int entityLength = buffer.getInt(); + if (entityLength <= 0 || entityLength > MAX_FIELD_LENGTH) { + throw new IllegalArgumentException(String.format("Invalid entity length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, entityLength)); + } + + // decode the entity + byte[] entityBytes = new byte[entityLength]; + buffer.get(entityBytes); + String entity = new String(entityBytes); + + // decode the groups + List<Object> groups = new ArrayList<>(); + int numberOfGroups = buffer.getInt(); + for (int i = 0; i < numberOfGroups; i++) { + + // validate the group length + int groupLength = buffer.getInt(); + if (groupLength <= 0 || groupLength > MAX_FIELD_LENGTH) { + throw new IllegalArgumentException(String.format("Invalid group length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, groupLength)); + } + + // decode the group + byte[] groupBytes = new byte[groupLength]; + buffer.get(groupBytes); + + String group = new String(groupBytes); + groups.add(group); + } + + // decode the period + long periodId = buffer.getLong(); + long duration = buffer.getLong(); + ProfilePeriod period = ProfilePeriod.buildFromPeriod(periodId, duration, TimeUnit.MILLISECONDS); + + return new ProfileMeasurement() + .withProfileName(profile) + .withEntity(entity) + .withGroups(groups) + .withProfilePeriod(period); + + } catch(BufferUnderflowException e) { + throw new IllegalArgumentException("Unable to decode the row key", e); + } + + } + + /** + * Builds the 'group' component of the row key. + * @param groups The groups to include in the row key. + */ + private byte[] encodeGroups(List<Object> groups) { + + // encode each of the groups and determine their size + int lengthOfGroups = 0; + List<byte[]> groupBytes = new ArrayList<>(); + for(Object group : groups) { + byte[] groupB = Bytes.toBytes(String.valueOf(group)); + groupBytes.add(groupB); + lengthOfGroups += groupB.length; + } + + // encode each of the groups + ByteBuffer buffer = ByteBuffer + .allocate((Integer.BYTES * (1 + groups.size())) + lengthOfGroups) + .order(byteOrder) + .putInt(groups.size()); + + for(byte[] groupB : groupBytes) { + buffer.putInt(groupB.length).put(groupB); + } + + return buffer.array(); + } + + /** + * Builds the 'time' portion of the row key + * @param period The ProfilePeriod in which the ProfileMeasurement was taken. + */ + private static byte[] encodePeriod(ProfilePeriod period) { + return encodePeriod(period.getPeriod()); + } + + /** + * Builds the 'time' portion of the row key + * @param period the period + */ + private static byte[] encodePeriod(long period) { + return Bytes.toBytes(period); + } + + /** + * Calculates a salt value that is used as part of the row key. + * + * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally + * is close to the number of nodes in the Hbase cluster. + * + * @param period The period in which a profile measurement is taken. + */ + public static byte[] encodeSalt(ProfilePeriod period, int saltDivisor) { + return encodeSalt(period.getPeriod(), saltDivisor); + } + + /** + * Calculates a salt value that is used as part of the row key. + * + * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally + * is close to the number of nodes in the Hbase cluster. + * + * @param period The period + * @param saltDivisor The salt divisor + */ + public static byte[] encodeSalt(long period, int saltDivisor) { + try { + // an MD5 is 16 bytes aka 128 bits + MessageDigest digest = MessageDigest.getInstance("MD5"); + byte[] hash = digest.digest(encodePeriod(period)); + int salt = Bytes.toShort(hash) % saltDivisor; + return Bytes.toBytes(salt); --- End diff -- Chiming in in favor of a prefixed hash. @mattf-horton you're right about the hash preventing hotspotting. Consider the case where we are doing batch updates of profiles ( you have a new profile, you want to backfill data). In that case, you could be sending possibly quite a bit of puts through. > Create Decodable Row Key for Profiler > ------------------------------------- > > Key: METRON-1005 > URL: https://issues.apache.org/jira/browse/METRON-1005 > Project: Metron > Issue Type: Improvement > Affects Versions: 0.3.0 > Reporter: Nick Allen > Assignee: Nick Allen > Fix For: Next + 1 > > > To be able to answer the types of questions that I outlined in METRON-450, we > need a row key that is decodable. Right now there is no logic to decode a > row key, nor is the existing row key easily decodable. > Once the row keys can be decoded, you could scan all of the row keys in the > Profiler's HBase table, decode each of them and extract things like, the > names of all your profiles, the names of entities within a profile, the > period duration of a given profile. -- This message was sent by Atlassian JIRA (v6.4.14#64029)