http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b67fa4/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java deleted file mode 100644 index c24d730..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java +++ /dev/null @@ -1,376 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by - * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language - * governing permissions and limitations under the License. - */ -package org.apache.phoenix.hbase.index.covered.example; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map.Entry; - -import org.apache.commons.lang.ArrayUtils; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.hbase.index.builder.BaseIndexCodec; -import org.apache.phoenix.hbase.index.covered.IndexMetaData; -import org.apache.phoenix.hbase.index.covered.IndexUpdate; -import org.apache.phoenix.hbase.index.covered.LocalTableState; -import org.apache.phoenix.hbase.index.covered.TableState; -import org.apache.phoenix.hbase.index.scanner.Scanner; -import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner; - -import com.google.common.collect.Lists; - -/** - * - */ -public class CoveredColumnIndexCodec extends BaseIndexCodec { - - private static final byte[] EMPTY_BYTES = new byte[0]; - public static final byte[] INDEX_ROW_COLUMN_FAMILY = Bytes.toBytes("INDEXED_COLUMNS"); - - private List<ColumnGroup> groups; - - /** - * @param groups - * to initialize the codec with - * @return an instance that is initialized with the given {@link ColumnGroup}s, for testing purposes - */ - public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> groups) { - CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec(); - codec.groups = Lists.newArrayList(groups); - return codec; - } - - @Override - public void initialize(RegionCoprocessorEnvironment env) { - groups = CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration()); - } - - @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData indexMetaData) { - List<IndexUpdate> updates = new ArrayList<IndexUpdate>(groups.size()); - for (ColumnGroup group : groups) { - IndexUpdate update = getIndexUpdateForGroup(group, state, indexMetaData); - updates.add(update); - } - return updates; - } - - /** - * @param group - * @param state - * @return the update that should be made to the table - */ - private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state, IndexMetaData indexMetaData) { - List<CoveredColumn> refs = group.getColumns(); - try { - Pair<CoveredDeleteScanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, indexMetaData); - Scanner kvs = stateInfo.getFirst(); - Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs, state.getCurrentRowKey()); - // make sure we close the scanner - kvs.close(); - if (columns.getFirst().intValue() == 0) { return stateInfo.getSecond(); } - // have all the column entries, so just turn it into a Delete for the row - // convert the entries to the needed values - byte[] rowKey = composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond()); - Put p = new Put(rowKey, state.getCurrentTimestamp()); - // add the columns to the put - addColumnsToPut(p, columns.getSecond()); - - // update the index info - IndexUpdate update = stateInfo.getSecond(); - update.setTable(Bytes.toBytes(group.getTable())); - update.setUpdate(p); - return update; - } catch (IOException e) { - throw new RuntimeException("Unexpected exception when getting state for columns: " + refs); - } - } - - private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> columns) { - // add each of the corresponding families to the put - int count = 0; - for (ColumnEntry column : columns) { - indexInsert.add(INDEX_ROW_COLUMN_FAMILY, - ArrayUtils.addAll(Bytes.toBytes(count++), toIndexQualifier(column.ref)), null); - } - } - - private static byte[] toIndexQualifier(CoveredColumn column) { - return ArrayUtils.addAll(Bytes.toBytes(column.familyString + CoveredColumn.SEPARATOR), column.getQualifier()); - } - - @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) { - List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(groups.size()); - for (ColumnGroup group : groups) { - deletes.add(getDeleteForGroup(group, state, context)); - } - return deletes; - } - - /** - * Get all the deletes necessary for a group of columns - logically, the cleanup the index table for a given index. - * - * @param group - * index information - * @return the cleanup for the given index, or <tt>null</tt> if no cleanup is necessary - */ - private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state, IndexMetaData indexMetaData) { - List<CoveredColumn> refs = group.getColumns(); - try { - Pair<CoveredDeleteScanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, indexMetaData); - Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey()); - // make sure we close the scanner reference - kvs.getFirst().close(); - // no change, just return the passed update - if (columns.getFirst() == 0) { return kvs.getSecond(); } - // have all the column entries, so just turn it into a Delete for the row - // convert the entries to the needed values - byte[] rowKey = composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond()); - Delete d = new Delete(rowKey); - d.setTimestamp(state.getCurrentTimestamp()); - IndexUpdate update = kvs.getSecond(); - update.setUpdate(d); - update.setTable(Bytes.toBytes(group.getTable())); - return update; - } catch (IOException e) { - throw new RuntimeException("Unexpected exception when getting state for columns: " + refs); - } - } - - /** - * Get the next batch of primary table values for the given columns - * - * @param refs - * columns to match against - * @param state - * @return the total length of all values found and the entries to add for the index - */ - private Pair<Integer, List<ColumnEntry>> getNextEntries(List<CoveredColumn> refs, Scanner kvs, byte[] currentRow) - throws IOException { - int totalValueLength = 0; - List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size()); - - // pull out the latest state for each column reference, in order - for (CoveredColumn ref : refs) { - KeyValue first = ref.getFirstKeyValueForRow(currentRow); - if (!kvs.seek(first)) { - // no more keys, so add a null value - entries.add(new ColumnEntry(null, ref)); - continue; - } - // there is a next value - we only care about the current value, so we can just snag that - Cell next = kvs.next(); - if (ref.matchesFamily(next.getFamily()) && ref.matchesQualifier(next.getQualifier())) { - byte[] v = next.getValue(); - totalValueLength += v.length; - entries.add(new ColumnEntry(v, ref)); - } else { - // this first one didn't match at all, so we have to put in a null entry - entries.add(new ColumnEntry(null, ref)); - continue; - } - // here's where is gets a little tricky - we either need to decide if we should continue - // adding entries (matches all qualifiers) or if we are done (matches a single qualifier) - if (!ref.allColumns()) { - continue; - } - // matches all columns, so we need to iterate until we hit the next column with the same - // family as the current key - byte[] lastQual = next.getQualifier(); - byte[] nextQual = null; - while ((next = kvs.next()) != null) { - // different family, done with this column - if (!ref.matchesFamily(next.getFamily())) { - break; - } - nextQual = next.getQualifier(); - // we are still on the same qualifier - skip it, since we already added a column for it - if (Arrays.equals(lastQual, nextQual)) { - continue; - } - // this must match the qualifier since its an all-qualifiers specifier, so we add it - byte[] v = next.getValue(); - totalValueLength += v.length; - entries.add(new ColumnEntry(v, ref)); - // update the last qualifier to check against - lastQual = nextQual; - } - } - return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries); - } - - static class ColumnEntry { - byte[] value = EMPTY_BYTES; - CoveredColumn ref; - - public ColumnEntry(byte[] value, CoveredColumn ref) { - this.value = value == null ? EMPTY_BYTES : value; - this.ref = ref; - } - } - - /** - * Compose the final index row key. - * <p> - * This is faster than adding each value independently as we can just build a single a array and copy everything - * over once. - * - * @param pk - * primary key of the original row - * @param length - * total number of bytes of all the values that should be added - * @param values - * to use when building the key - */ - static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) { - final int numColumnEntries = values.size() * Bytes.SIZEOF_INT; - // now build up expected row key, each of the values, in order, followed by the PK and then some - // info about lengths so we can deserialize each value - // - // output = length of values + primary key + column entries + length of each column entry + number of column entries - byte[] output = new byte[length + pk.length + numColumnEntries + Bytes.SIZEOF_INT]; - int pos = 0; - int[] lengths = new int[values.size()]; - int i = 0; - for (ColumnEntry entry : values) { - byte[] v = entry.value; - // skip doing the copy attempt, if we don't need to - if (v.length != 0) { - System.arraycopy(v, 0, output, pos, v.length); - pos += v.length; - } - lengths[i++] = v.length; - } - - // add the primary key to the end of the row key - System.arraycopy(pk, 0, output, pos, pk.length); - pos += pk.length; - - // add the lengths as suffixes so we can deserialize the elements again - for (int l : lengths) { - byte[] serializedLength = Bytes.toBytes(l); - System.arraycopy(serializedLength, 0, output, pos, Bytes.SIZEOF_INT); - pos += Bytes.SIZEOF_INT; - } - - // and the last integer is the number of values - byte[] serializedNumValues = Bytes.toBytes(values.size()); - System.arraycopy(serializedNumValues, 0, output, pos, Bytes.SIZEOF_INT); - // Just in case we serialize more in the rowkey in the future.. - pos += Bytes.SIZEOF_INT; - - return output; - } - - /** - * Essentially a short-cut from building a {@link Put}. - * - * @param pk - * row key - * @param timestamp - * timestamp of all the keyvalues - * @param values - * expected value--column pair - * @return a keyvalues that the index contains for a given row at a timestamp with the given value -- column pairs. - */ - public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long timestamp, - List<Pair<byte[], CoveredColumn>> values) { - - int length = 0; - List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size()); - for (Pair<byte[], CoveredColumn> value : values) { - ColumnEntry entry = new ColumnEntry(value.getFirst(), value.getSecond()); - length += value.getFirst().length; - expected.add(entry); - } - - byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, expected); - Put p = new Put(rowKey, timestamp); - CoveredColumnIndexCodec.addColumnsToPut(p, expected); - List<KeyValue> kvs = new ArrayList<KeyValue>(); - for (Entry<byte[], List<KeyValue>> entry : p.getFamilyMap().entrySet()) { - kvs.addAll(entry.getValue()); - } - - return kvs; - } - - public static List<byte[]> getValues(byte[] bytes) { - // get the total number of keys in the bytes - int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length); - List<byte[]> keys = new ArrayList<byte[]>(keyCount); - int[] lengths = new int[keyCount]; - int lengthPos = keyCount - 1; - int pos = bytes.length - Bytes.SIZEOF_INT; - // figure out the length of each key - for (int i = 0; i < keyCount; i++) { - lengths[lengthPos--] = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos); - pos -= Bytes.SIZEOF_INT; - } - - int current = 0; - for (int length : lengths) { - byte[] key = Arrays.copyOfRange(bytes, current, current + length); - keys.add(key); - current += length; - } - - return keys; - } - - /** - * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes - * - * @param bytes - * array to read from - * @param start - * start point, backwards from which to read. For example, if specifying "25", we would try to read an - * integer from 21 -> 25 - * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, if it exists. - */ - private static int getPreviousInteger(byte[] bytes, int start) { - return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT); - } - - /** - * Check to see if an row key just contains a list of null values. - * - * @param bytes - * row key to examine - * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> otherwise - */ - public static boolean checkRowKeyForAllNulls(byte[] bytes) { - int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length); - int pos = bytes.length - Bytes.SIZEOF_INT; - for (int i = 0; i < keyCount; i++) { - int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos); - if (next > 0) { return false; } - pos -= Bytes.SIZEOF_INT; - } - - return true; - } - - @Override - public boolean isEnabled(Mutation m) { - // this could be a bit smarter, looking at the groups for the mutation, but we leave it at this - // simple check for the moment. - return groups.size() > 0; - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b67fa4/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java deleted file mode 100644 index 48c714d..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.phoenix.hbase.index.covered.example; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Coprocessor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.phoenix.hbase.index.Indexer; -import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; -import org.apache.phoenix.hbase.index.covered.IndexCodec; - -/** - * Helper to build the configuration for the {@link CoveredColumnIndexer}. - * <p> - * This class is NOT thread-safe; all concurrent access must be managed externally. - */ -public class CoveredColumnIndexSpecifierBuilder { - - private static final String INDEX_TO_TABLE_CONF_PREFX = "hbase.index.covered."; - // number of index 'groups'. Each group represents a set of 'joined' columns. The data stored with - // each joined column are either just the columns in the group or all the most recent data in the - // row (a fully covered index). - private static final String COUNT = ".count"; - private static final String INDEX_GROUPS_COUNT_KEY = INDEX_TO_TABLE_CONF_PREFX + ".groups" + COUNT; - private static final String INDEX_GROUP_PREFIX = INDEX_TO_TABLE_CONF_PREFX + "group."; - private static final String INDEX_GROUP_COVERAGE_SUFFIX = ".columns"; - private static final String TABLE_SUFFIX = ".table"; - - // right now, we don't support this should be easy enough to add later - // private static final String INDEX_GROUP_FULLY_COVERED = ".covered"; - - List<ColumnGroup> groups = new ArrayList<ColumnGroup>(); - private Map<String, String> specs = new HashMap<String, String>(); - - /** - * Add a group of columns to index - * @param columns Pairs of cf:cq (full specification of a column) to index - * @return the index of the group. This can be used to remove or modify the group via - * {@link #remove(int)} or {@link #get(int)}, any time before building - */ - public int addIndexGroup(ColumnGroup columns) { - if (columns == null || columns.size() == 0) { - throw new IllegalArgumentException("Must specify some columns to index!"); - } - int size = this.groups.size(); - this.groups.add(columns); - return size; - } - - public void remove(int i) { - this.groups.remove(i); - } - - public ColumnGroup get(int i) { - return this.groups.get(i); - } - - /** - * Clear the stored {@link ColumnGroup}s for resuse. - */ - public void reset() { - this.groups.clear(); - } - - Map<String, String> convertToMap() { - int total = this.groups.size(); - // hbase.index.covered.groups = i - specs.put(INDEX_GROUPS_COUNT_KEY, Integer.toString(total)); - - int i = 0; - for (ColumnGroup group : groups) { - addIndexGroupToSpecs(specs, group, i++); - } - - return specs; - } - - /** - * @param specs - * @param columns - * @param index - */ - private void addIndexGroupToSpecs(Map<String, String> specs, ColumnGroup columns, int index) { - // hbase.index.covered.group.<i> - String prefix = INDEX_GROUP_PREFIX + Integer.toString(index); - - // set the table to which the group writes - // hbase.index.covered.group.<i>.table - specs.put(prefix + TABLE_SUFFIX, columns.getTable()); - - // a different key for each column in the group - // hbase.index.covered.group.<i>.columns - String columnPrefix = prefix + INDEX_GROUP_COVERAGE_SUFFIX; - // hbase.index.covered.group.<i>.columns.count = <j> - String columnsSizeKey = columnPrefix + COUNT; - specs.put(columnsSizeKey, Integer.toString(columns.size())); - - // add each column in the group - int i=0; - for (CoveredColumn column : columns) { - // hbase.index.covered.group.<i>.columns.<j> - String nextKey = columnPrefix + "." + Integer.toString(i); - String nextValue = column.serialize(); - specs.put(nextKey, nextValue); - i++; - } - } - - public void build(HTableDescriptor desc) throws IOException { - build(desc, CoveredColumnIndexCodec.class); - } - - void build(HTableDescriptor desc, Class<? extends IndexCodec> clazz) throws IOException { - // add the codec for the index to the map of options - Map<String, String> opts = this.convertToMap(); - opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName()); - Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts, Coprocessor.PRIORITY_USER); - } - - static List<ColumnGroup> getColumns(Configuration conf) { - int count= conf.getInt(INDEX_GROUPS_COUNT_KEY, 0); - if (count ==0) { - return Collections.emptyList(); - } - - // parse out all the column groups we should index - List<ColumnGroup> columns = new ArrayList<ColumnGroup>(count); - for (int i = 0; i < count; i++) { - // parse out each group - String prefix = INDEX_GROUP_PREFIX + i; - - // hbase.index.covered.group.<i>.table - String table = conf.get(prefix + TABLE_SUFFIX); - ColumnGroup group = new ColumnGroup(table); - - // parse out each column in the group - // hbase.index.covered.group.<i>.columns - String columnPrefix = prefix + INDEX_GROUP_COVERAGE_SUFFIX; - // hbase.index.covered.group.<i>.columns.count = j - String columnsSizeKey = columnPrefix + COUNT; - int columnCount = conf.getInt(columnsSizeKey, 0); - for(int j=0; j< columnCount; j++){ - String columnKey = columnPrefix + "." + j; - CoveredColumn column = CoveredColumn.parse(conf.get(columnKey)); - group.add(column); - } - - // add the group - columns.add(group); - } - return columns; - } - - /** - * @param key - * @param value - */ - public void addArbitraryConfigForTesting(String key, String value) { - this.specs.put(key, value); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b67fa4/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java deleted file mode 100644 index 925bcbb..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.phoenix.hbase.index.covered.example; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.hbase.index.covered.Batch; -import org.apache.phoenix.hbase.index.covered.IndexMetaData; -import org.apache.phoenix.hbase.index.covered.LocalTableState; -import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; -import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager; -import org.apache.phoenix.hbase.index.util.IndexManagementUtil; - -/** - * Index maintainer that maintains multiple indexes based on '{@link ColumnGroup}s'. Each group is a - * fully covered within itself and stores the fully 'pre-joined' version of that values for that - * group of columns. - * <p> - * <h2>Index Layout</h2> The row key for a given index entry is the current state of the all the - * values of the columns in a column group, followed by the primary key (row key) of the original - * row, and then the length of each value and then finally the total number of values. This is then - * enough information to completely rebuild the latest value of row for each column in the group. - * <p> - * The family is always {@link CoveredColumnIndexCodec#INDEX_ROW_COLUMN_FAMILY} - * <p> - * The qualifier is prepended with the integer index (serialized with {@link Bytes#toBytes(int)}) of - * the column in the group. This index corresponds the index of the value for the group in the row - * key. - * - * <pre> - * ROW || FAMILY || QUALIFIER || VALUE - * (v1)(v2)...(vN)(pk)(L1)(L2)...(Ln)(#V) || INDEX_FAMILY || 1Cf1:Cq1 || null - * (v1)(v2)...(vN)(pk)(L1)(L2)...(Ln)(#V) || INDEX_FAMILY || 2Cf2:Cq2 || null - * ... - * (v1)(v2)...(vN)(pk)(L1)(L2)...(Ln)(#V) || INDEX_FAMILY || NCfN:CqN || null - * </pre> - * - * <h2>Index Maintenance</h2> - * <p> - * When making an insertion into the table, we also attempt to cleanup the index. This means that we - * need to remove the previous entry from the index. Generally, this is completed by inserting a - * delete at the previous value of the previous row. - * <p> - * The main caveat here is when dealing with custom timestamps. If there is no special timestamp - * specified, we can just insert the proper {@link Delete} at the current timestamp and move on. - * However, when the client specifies a timestamp, we could see updates out of order. In that case, - * we can do an insert using the specified timestamp, but a delete is different... - * <p> - * Taking the simple case, assume we do a single column in a group. Then if we get an out of order - * update, we need to check the current state of that column in the current row. If the current row - * is older, we can issue a delete as normal. If the current row is newer, however, we then have to - * issue a delete for the index update at the time of the current row. This ensures that the index - * update made for the 'future' time still covers the existing row. - * <p> - * <b>ASSUMPTION:</b> all key-values in a single {@link Delete}/{@link Put} have the same timestamp. - * This dramatically simplifies the logic needed to manage updating the index for out-of-order - * {@link Put}s as we don't need to manage multiple levels of timestamps across multiple columns. - * <p> - * We can extend this to multiple columns by picking the latest update of any column in group as the - * delete point. - * <p> - * <b>NOTE:</b> this means that we need to do a lookup (point {@link Get}) of the entire row - * <i>every time there is a write to the table</i>. - */ -public class CoveredColumnIndexer extends NonTxIndexBuilder { - - /** - * Create the specified index table with the necessary columns - * @param admin {@link HBaseAdmin} to use when creating the table - * @param indexTable name of the index table. - * @throws IOException - */ - public static void createIndexTable(HBaseAdmin admin, String indexTable) throws IOException { - createIndexTable(admin, new HTableDescriptor(indexTable)); - } - - /** - * @param admin to create the table - * @param index descriptor to update before creating table - */ - public static void createIndexTable(HBaseAdmin admin, HTableDescriptor index) throws IOException { - HColumnDescriptor col = - new HColumnDescriptor(CoveredColumnIndexCodec.INDEX_ROW_COLUMN_FAMILY); - // ensure that we can 'see past' delete markers when doing scans - col.setKeepDeletedCells(true); - index.addFamily(col); - admin.createTable(index); - } - - @Override - public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows( - Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException { - // stores all the return values - IndexUpdateManager updateMap = new IndexUpdateManager(indexMetaData); - // batch the updates by row to make life easier and ordered - Collection<Batch> batches = batchByRow(filtered); - - for (Batch batch : batches) { - Cell curKV = batch.getKvs().iterator().next(); - Put p = new Put(curKV.getRowArray(), curKV.getRowOffset(), curKV.getRowLength()); - for (Cell kv : batch.getKvs()) { - // we only need to cleanup Put entries - byte type = kv.getTypeByte(); - Type t = KeyValue.Type.codeToType(type); - if (!t.equals(Type.Put)) { - continue; - } - - // add the kv independently - p.add(kv); - } - - // do the usual thing as for deletes - Collection<Batch> timeBatch = IndexManagementUtil.createTimestampBatchesFromMutation(p); - LocalTableState state = new LocalTableState(env, localTable, p); - for (Batch entry : timeBatch) { - //just set the timestamp on the table - it already has all the future state - state.setCurrentTimestamp(entry.getTimestamp()); - this.addDeleteUpdatesToMap(updateMap, state, entry.getTimestamp(), indexMetaData); - } - } - return updateMap.toMap(); - } - - - /** - * @param filtered - */ - private Collection<Batch> batchByRow(Collection<KeyValue> filtered) { - Map<Long, Batch> batches = new HashMap<Long, Batch>(); - IndexManagementUtil.createTimestampBatchesFromKeyValues(filtered, batches); - return batches.values(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b67fa4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/ColumnGroup.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/ColumnGroup.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/ColumnGroup.java new file mode 100644 index 0000000..81b5805 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/ColumnGroup.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.hbase.index.covered; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hbase.util.Bytes; + + +/** + * A collection of {@link CoveredColumn}s that should be included in a covered index. + */ +public class ColumnGroup implements Iterable<CoveredColumn> { + + private List<CoveredColumn> columns = new ArrayList<CoveredColumn>(); + private String table; + + public ColumnGroup(String tableName) { + this.table = tableName; + } + + public void add(CoveredColumn column) { + this.columns.add(column); + } + + public String getTable() { + return table; + } + + /** + * Check to see if any {@link CoveredColumn} in <tt>this</tt> matches the given family + * @param family to check + * @return <tt>true</tt> if any column covers the family + */ + public boolean matches(String family) { + for (CoveredColumn column : columns) { + if (column.matchesFamily(family)) { + return true; + } + } + + return false; + } + + /** + * Check to see if any column matches the family/qualifier pair + * @param family family to match against + * @param qualifier qualifier to match, can be <tt>null</tt>, in which case we match all + * qualifiers + * @return <tt>true</tt> if any column matches, <tt>false</tt> otherwise + */ + public boolean matches(byte[] family, byte[] qualifier) { + // families are always printable characters + String fam = Bytes.toString(family); + for (CoveredColumn column : columns) { + if (column.matchesFamily(fam)) { + // check the qualifier + if (column.matchesQualifier(qualifier)) { + return true; + } + } + } + return false; + } + + /** + * @return the number of columns in the group + */ + public int size() { + return this.columns.size(); + } + + @Override + public Iterator<CoveredColumn> iterator() { + return columns.iterator(); + } + + /** + * @param index index of the column to get + * @return the specified column + */ + public CoveredColumn getColumnForTesting(int index) { + return this.columns.get(index); + } + + @Override + public String toString() { + return "ColumnGroup - table: " + table + ", columns: " + columns; + } + + public List<CoveredColumn> getColumns() { + return this.columns; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b67fa4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumn.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumn.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumn.java new file mode 100644 index 0000000..bb03f56 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumn.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.hbase.index.covered; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; + +/** + * A single Column (either a Column Family or a full Family:Qualifier pair) in a {@link ColumnGroup} + * . If no column qualifier is specified (null), matches all known qualifiers of the family. + */ +public class CoveredColumn extends ColumnReference { + + public static final String SEPARATOR = ":"; + String familyString; + private final int hashCode; + + public CoveredColumn(byte[] family, byte[] qualifier){ + this(Bytes.toString(family), qualifier); + } + + public CoveredColumn(String family, byte[] qualifier) { + super(Bytes.toBytes(family), qualifier == null ? ColumnReference.ALL_QUALIFIERS : qualifier); + this.familyString = family; + this.hashCode = calcHashCode(family, qualifier); + } + + public static CoveredColumn parse(String spec) { + int sep = spec.indexOf(SEPARATOR); + if (sep < 0) { + throw new IllegalArgumentException(spec + " is not a valid specifier!"); + } + String family = spec.substring(0, sep); + String qual = spec.substring(sep + 1); + byte[] column = qual.length() == 0 ? null : Bytes.toBytes(qual); + return new CoveredColumn(family, column); + } + + public String serialize() { + return CoveredColumn.serialize(familyString, getQualifier()); + } + + public static String serialize(String first, byte[] second) { + String nextValue = first + CoveredColumn.SEPARATOR; + if (second != null) { + nextValue += Bytes.toString(second); + } + return nextValue; + } + + /** + * @param family2 to check + * @return <tt>true</tt> if the passed family matches the family this column covers + */ + public boolean matchesFamily(String family2) { + return this.familyString.equals(family2); + } + + @Override + public int hashCode() { + return hashCode; + } + + private static int calcHashCode(String familyString, byte[] qualifier) { + final int prime = 31; + int result = 1; + result = prime * result + familyString.hashCode(); + if (qualifier != null) { + result = prime * result + Bytes.hashCode(qualifier); + } + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + CoveredColumn other = (CoveredColumn) obj; + if (hashCode != other.hashCode) return false; + if (!familyString.equals(other.familyString)) return false; + return Bytes.equals(getQualifier(), other.getQualifier()); + } + + @Override + public String toString() { + String qualString = getQualifier() == null ? "null" : Bytes.toString(getQualifier()); + return "CoveredColumn:[" + familyString + ":" + qualString + "]"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b67fa4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java new file mode 100644 index 0000000..3f6a552 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java @@ -0,0 +1,372 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.apache.phoenix.hbase.index.covered; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.hbase.index.builder.BaseIndexCodec; +import org.apache.phoenix.hbase.index.scanner.Scanner; +import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner; + +import com.google.common.collect.Lists; + +/** + * + */ +public class CoveredColumnIndexCodec extends BaseIndexCodec { + + private static final byte[] EMPTY_BYTES = new byte[0]; + public static final byte[] INDEX_ROW_COLUMN_FAMILY = Bytes.toBytes("INDEXED_COLUMNS"); + + private List<ColumnGroup> groups; + + /** + * @param groups + * to initialize the codec with + * @return an instance that is initialized with the given {@link ColumnGroup}s, for testing purposes + */ + public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> groups) { + CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec(); + codec.groups = Lists.newArrayList(groups); + return codec; + } + + @Override + public void initialize(RegionCoprocessorEnvironment env) { + groups = CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration()); + } + + @Override + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData indexMetaData) { + List<IndexUpdate> updates = new ArrayList<IndexUpdate>(groups.size()); + for (ColumnGroup group : groups) { + IndexUpdate update = getIndexUpdateForGroup(group, state, indexMetaData); + updates.add(update); + } + return updates; + } + + /** + * @param group + * @param state + * @return the update that should be made to the table + */ + private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state, IndexMetaData indexMetaData) { + List<CoveredColumn> refs = group.getColumns(); + try { + Pair<CoveredDeleteScanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, indexMetaData); + Scanner kvs = stateInfo.getFirst(); + Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs, state.getCurrentRowKey()); + // make sure we close the scanner + kvs.close(); + if (columns.getFirst().intValue() == 0) { return stateInfo.getSecond(); } + // have all the column entries, so just turn it into a Delete for the row + // convert the entries to the needed values + byte[] rowKey = composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond()); + Put p = new Put(rowKey, state.getCurrentTimestamp()); + // add the columns to the put + addColumnsToPut(p, columns.getSecond()); + + // update the index info + IndexUpdate update = stateInfo.getSecond(); + update.setTable(Bytes.toBytes(group.getTable())); + update.setUpdate(p); + return update; + } catch (IOException e) { + throw new RuntimeException("Unexpected exception when getting state for columns: " + refs); + } + } + + private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> columns) { + // add each of the corresponding families to the put + int count = 0; + for (ColumnEntry column : columns) { + indexInsert.add(INDEX_ROW_COLUMN_FAMILY, + ArrayUtils.addAll(Bytes.toBytes(count++), toIndexQualifier(column.ref)), null); + } + } + + private static byte[] toIndexQualifier(CoveredColumn column) { + return ArrayUtils.addAll(Bytes.toBytes(column.familyString + CoveredColumn.SEPARATOR), column.getQualifier()); + } + + @Override + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) { + List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(groups.size()); + for (ColumnGroup group : groups) { + deletes.add(getDeleteForGroup(group, state, context)); + } + return deletes; + } + + /** + * Get all the deletes necessary for a group of columns - logically, the cleanup the index table for a given index. + * + * @param group + * index information + * @return the cleanup for the given index, or <tt>null</tt> if no cleanup is necessary + */ + private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state, IndexMetaData indexMetaData) { + List<CoveredColumn> refs = group.getColumns(); + try { + Pair<CoveredDeleteScanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, indexMetaData); + Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey()); + // make sure we close the scanner reference + kvs.getFirst().close(); + // no change, just return the passed update + if (columns.getFirst() == 0) { return kvs.getSecond(); } + // have all the column entries, so just turn it into a Delete for the row + // convert the entries to the needed values + byte[] rowKey = composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond()); + Delete d = new Delete(rowKey); + d.setTimestamp(state.getCurrentTimestamp()); + IndexUpdate update = kvs.getSecond(); + update.setUpdate(d); + update.setTable(Bytes.toBytes(group.getTable())); + return update; + } catch (IOException e) { + throw new RuntimeException("Unexpected exception when getting state for columns: " + refs); + } + } + + /** + * Get the next batch of primary table values for the given columns + * + * @param refs + * columns to match against + * @param state + * @return the total length of all values found and the entries to add for the index + */ + private Pair<Integer, List<ColumnEntry>> getNextEntries(List<CoveredColumn> refs, Scanner kvs, byte[] currentRow) + throws IOException { + int totalValueLength = 0; + List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size()); + + // pull out the latest state for each column reference, in order + for (CoveredColumn ref : refs) { + KeyValue first = ref.getFirstKeyValueForRow(currentRow); + if (!kvs.seek(first)) { + // no more keys, so add a null value + entries.add(new ColumnEntry(null, ref)); + continue; + } + // there is a next value - we only care about the current value, so we can just snag that + Cell next = kvs.next(); + if (ref.matchesFamily(next.getFamily()) && ref.matchesQualifier(next.getQualifier())) { + byte[] v = next.getValue(); + totalValueLength += v.length; + entries.add(new ColumnEntry(v, ref)); + } else { + // this first one didn't match at all, so we have to put in a null entry + entries.add(new ColumnEntry(null, ref)); + continue; + } + // here's where is gets a little tricky - we either need to decide if we should continue + // adding entries (matches all qualifiers) or if we are done (matches a single qualifier) + if (!ref.allColumns()) { + continue; + } + // matches all columns, so we need to iterate until we hit the next column with the same + // family as the current key + byte[] lastQual = next.getQualifier(); + byte[] nextQual = null; + while ((next = kvs.next()) != null) { + // different family, done with this column + if (!ref.matchesFamily(next.getFamily())) { + break; + } + nextQual = next.getQualifier(); + // we are still on the same qualifier - skip it, since we already added a column for it + if (Arrays.equals(lastQual, nextQual)) { + continue; + } + // this must match the qualifier since its an all-qualifiers specifier, so we add it + byte[] v = next.getValue(); + totalValueLength += v.length; + entries.add(new ColumnEntry(v, ref)); + // update the last qualifier to check against + lastQual = nextQual; + } + } + return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries); + } + + public static class ColumnEntry { + byte[] value = EMPTY_BYTES; + CoveredColumn ref; + + public ColumnEntry(byte[] value, CoveredColumn ref) { + this.value = value == null ? EMPTY_BYTES : value; + this.ref = ref; + } + } + + /** + * Compose the final index row key. + * <p> + * This is faster than adding each value independently as we can just build a single a array and copy everything + * over once. + * + * @param pk + * primary key of the original row + * @param length + * total number of bytes of all the values that should be added + * @param values + * to use when building the key + */ + public static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) { + final int numColumnEntries = values.size() * Bytes.SIZEOF_INT; + // now build up expected row key, each of the values, in order, followed by the PK and then some + // info about lengths so we can deserialize each value + // + // output = length of values + primary key + column entries + length of each column entry + number of column entries + byte[] output = new byte[length + pk.length + numColumnEntries + Bytes.SIZEOF_INT]; + int pos = 0; + int[] lengths = new int[values.size()]; + int i = 0; + for (ColumnEntry entry : values) { + byte[] v = entry.value; + // skip doing the copy attempt, if we don't need to + if (v.length != 0) { + System.arraycopy(v, 0, output, pos, v.length); + pos += v.length; + } + lengths[i++] = v.length; + } + + // add the primary key to the end of the row key + System.arraycopy(pk, 0, output, pos, pk.length); + pos += pk.length; + + // add the lengths as suffixes so we can deserialize the elements again + for (int l : lengths) { + byte[] serializedLength = Bytes.toBytes(l); + System.arraycopy(serializedLength, 0, output, pos, Bytes.SIZEOF_INT); + pos += Bytes.SIZEOF_INT; + } + + // and the last integer is the number of values + byte[] serializedNumValues = Bytes.toBytes(values.size()); + System.arraycopy(serializedNumValues, 0, output, pos, Bytes.SIZEOF_INT); + // Just in case we serialize more in the rowkey in the future.. + pos += Bytes.SIZEOF_INT; + + return output; + } + + /** + * Essentially a short-cut from building a {@link Put}. + * + * @param pk + * row key + * @param timestamp + * timestamp of all the keyvalues + * @param values + * expected value--column pair + * @return a keyvalues that the index contains for a given row at a timestamp with the given value -- column pairs. + */ + public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long timestamp, + List<Pair<byte[], CoveredColumn>> values) { + + int length = 0; + List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size()); + for (Pair<byte[], CoveredColumn> value : values) { + ColumnEntry entry = new ColumnEntry(value.getFirst(), value.getSecond()); + length += value.getFirst().length; + expected.add(entry); + } + + byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, expected); + Put p = new Put(rowKey, timestamp); + CoveredColumnIndexCodec.addColumnsToPut(p, expected); + List<KeyValue> kvs = new ArrayList<KeyValue>(); + for (Entry<byte[], List<KeyValue>> entry : p.getFamilyMap().entrySet()) { + kvs.addAll(entry.getValue()); + } + + return kvs; + } + + public static List<byte[]> getValues(byte[] bytes) { + // get the total number of keys in the bytes + int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length); + List<byte[]> keys = new ArrayList<byte[]>(keyCount); + int[] lengths = new int[keyCount]; + int lengthPos = keyCount - 1; + int pos = bytes.length - Bytes.SIZEOF_INT; + // figure out the length of each key + for (int i = 0; i < keyCount; i++) { + lengths[lengthPos--] = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos); + pos -= Bytes.SIZEOF_INT; + } + + int current = 0; + for (int length : lengths) { + byte[] key = Arrays.copyOfRange(bytes, current, current + length); + keys.add(key); + current += length; + } + + return keys; + } + + /** + * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes + * + * @param bytes + * array to read from + * @param start + * start point, backwards from which to read. For example, if specifying "25", we would try to read an + * integer from 21 -> 25 + * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, if it exists. + */ + private static int getPreviousInteger(byte[] bytes, int start) { + return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT); + } + + /** + * Check to see if an row key just contains a list of null values. + * + * @param bytes + * row key to examine + * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> otherwise + */ + public static boolean checkRowKeyForAllNulls(byte[] bytes) { + int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length); + int pos = bytes.length - Bytes.SIZEOF_INT; + for (int i = 0; i < keyCount; i++) { + int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos); + if (next > 0) { return false; } + pos -= Bytes.SIZEOF_INT; + } + + return true; + } + + @Override + public boolean isEnabled(Mutation m) { + // this could be a bit smarter, looking at the groups for the mutation, but we leave it at this + // simple check for the moment. + return groups.size() > 0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b67fa4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexSpecifierBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexSpecifierBuilder.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexSpecifierBuilder.java new file mode 100644 index 0000000..c7b2685 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexSpecifierBuilder.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.hbase.index.covered; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.phoenix.hbase.index.Indexer; + +/** + * Helper to build the configuration for the {@link NonTxIndexBuilder}. + * <p> + * This class is NOT thread-safe; all concurrent access must be managed externally. + */ +public class CoveredColumnIndexSpecifierBuilder { + + private static final String INDEX_TO_TABLE_CONF_PREFX = "hbase.index.covered."; + // number of index 'groups'. Each group represents a set of 'joined' columns. The data stored with + // each joined column are either just the columns in the group or all the most recent data in the + // row (a fully covered index). + private static final String COUNT = ".count"; + private static final String INDEX_GROUPS_COUNT_KEY = INDEX_TO_TABLE_CONF_PREFX + ".groups" + COUNT; + private static final String INDEX_GROUP_PREFIX = INDEX_TO_TABLE_CONF_PREFX + "group."; + private static final String INDEX_GROUP_COVERAGE_SUFFIX = ".columns"; + private static final String TABLE_SUFFIX = ".table"; + + // right now, we don't support this should be easy enough to add later + // private static final String INDEX_GROUP_FULLY_COVERED = ".covered"; + + List<ColumnGroup> groups = new ArrayList<ColumnGroup>(); + private Map<String, String> specs = new HashMap<String, String>(); + + /** + * Add a group of columns to index + * @param columns Pairs of cf:cq (full specification of a column) to index + * @return the index of the group. This can be used to remove or modify the group via + * {@link #remove(int)} or {@link #get(int)}, any time before building + */ + public int addIndexGroup(ColumnGroup columns) { + if (columns == null || columns.size() == 0) { + throw new IllegalArgumentException("Must specify some columns to index!"); + } + int size = this.groups.size(); + this.groups.add(columns); + return size; + } + + public void remove(int i) { + this.groups.remove(i); + } + + public ColumnGroup get(int i) { + return this.groups.get(i); + } + + /** + * Clear the stored {@link ColumnGroup}s for resuse. + */ + public void reset() { + this.groups.clear(); + } + + public Map<String, String> convertToMap() { + int total = this.groups.size(); + // hbase.index.covered.groups = i + specs.put(INDEX_GROUPS_COUNT_KEY, Integer.toString(total)); + + int i = 0; + for (ColumnGroup group : groups) { + addIndexGroupToSpecs(specs, group, i++); + } + + return specs; + } + + /** + * @param specs + * @param columns + * @param index + */ + private void addIndexGroupToSpecs(Map<String, String> specs, ColumnGroup columns, int index) { + // hbase.index.covered.group.<i> + String prefix = INDEX_GROUP_PREFIX + Integer.toString(index); + + // set the table to which the group writes + // hbase.index.covered.group.<i>.table + specs.put(prefix + TABLE_SUFFIX, columns.getTable()); + + // a different key for each column in the group + // hbase.index.covered.group.<i>.columns + String columnPrefix = prefix + INDEX_GROUP_COVERAGE_SUFFIX; + // hbase.index.covered.group.<i>.columns.count = <j> + String columnsSizeKey = columnPrefix + COUNT; + specs.put(columnsSizeKey, Integer.toString(columns.size())); + + // add each column in the group + int i=0; + for (CoveredColumn column : columns) { + // hbase.index.covered.group.<i>.columns.<j> + String nextKey = columnPrefix + "." + Integer.toString(i); + String nextValue = column.serialize(); + specs.put(nextKey, nextValue); + i++; + } + } + + public void build(HTableDescriptor desc) throws IOException { + build(desc, CoveredColumnIndexCodec.class); + } + + public void build(HTableDescriptor desc, Class<? extends IndexCodec> clazz) throws IOException { + // add the codec for the index to the map of options + Map<String, String> opts = this.convertToMap(); + opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName()); + Indexer.enableIndexing(desc, NonTxIndexBuilder.class, opts, Coprocessor.PRIORITY_USER); + } + + public static List<ColumnGroup> getColumns(Configuration conf) { + int count= conf.getInt(INDEX_GROUPS_COUNT_KEY, 0); + if (count ==0) { + return Collections.emptyList(); + } + + // parse out all the column groups we should index + List<ColumnGroup> columns = new ArrayList<ColumnGroup>(count); + for (int i = 0; i < count; i++) { + // parse out each group + String prefix = INDEX_GROUP_PREFIX + i; + + // hbase.index.covered.group.<i>.table + String table = conf.get(prefix + TABLE_SUFFIX); + ColumnGroup group = new ColumnGroup(table); + + // parse out each column in the group + // hbase.index.covered.group.<i>.columns + String columnPrefix = prefix + INDEX_GROUP_COVERAGE_SUFFIX; + // hbase.index.covered.group.<i>.columns.count = j + String columnsSizeKey = columnPrefix + COUNT; + int columnCount = conf.getInt(columnsSizeKey, 0); + for(int j=0; j< columnCount; j++){ + String columnKey = columnPrefix + "." + j; + CoveredColumn column = CoveredColumn.parse(conf.get(columnKey)); + group.add(column); + } + + // add the group + columns.add(group); + } + return columns; + } + + /** + * @param key + * @param value + */ + public void addArbitraryConfigForTesting(String key, String value) { + this.specs.put(key, value); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b67fa4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestColumnTracker.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestColumnTracker.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestColumnTracker.java new file mode 100644 index 0000000..7c5de5f --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestColumnTracker.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.covered; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; + +import org.junit.Test; + +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; + +public class TestColumnTracker { + + @Test + public void testEnsureGuarranteedMinValid() { + assertFalse("Guarranted min wasn't recognized as having newer timestamps!", + ColumnTracker.isNewestTime(ColumnTracker.GUARANTEED_NEWER_UPDATES)); + } + + @Test + public void testOnlyKeepsOlderTimestamps() { + Collection<ColumnReference> columns = new ArrayList<ColumnReference>(); + ColumnTracker tracker = new ColumnTracker(columns); + tracker.setTs(10); + assertEquals("Column tracker didn't set original TS", 10, tracker.getTS()); + tracker.setTs(12); + assertEquals("Column tracker allowed newer timestamp to be set.", 10, tracker.getTS()); + tracker.setTs(9); + assertEquals("Column tracker didn't decrease set timestamp for smaller value", 9, + tracker.getTS()); + } + + @Test + public void testHasNewerTimestamps() throws Exception { + Collection<ColumnReference> columns = new ArrayList<ColumnReference>(); + ColumnTracker tracker = new ColumnTracker(columns); + assertFalse("Tracker has newer timestamps when no ts set", tracker.hasNewerTimestamps()); + tracker.setTs(10); + assertTrue("Tracker doesn't have newer timetamps with set ts", tracker.hasNewerTimestamps()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b67fa4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java new file mode 100644 index 0000000..52a238f --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.covered; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.hbase.index.covered.CoveredColumnIndexCodec.ColumnEntry; +import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.collect.Lists; + +public class TestCoveredColumnIndexCodec { + private static final byte[] PK = new byte[] { 'a' }; + private static final String FAMILY_STRING = "family"; + private static final byte[] FAMILY = Bytes.toBytes(FAMILY_STRING); + private static final byte[] QUAL = Bytes.toBytes("qual"); + private static final CoveredColumn COLUMN_REF = new CoveredColumn(FAMILY_STRING, QUAL); + private static final byte[] EMPTY_INDEX_KEY = CoveredColumnIndexCodec.composeRowKey(PK, 0, + Arrays.asList(toColumnEntry(new byte[0]))); + private static final byte[] BLANK_INDEX_KEY = CoveredColumnIndexCodec.composeRowKey(PK, 0, + Collections.<ColumnEntry> emptyList()); + + private static ColumnEntry toColumnEntry(byte[] bytes) { + return new ColumnEntry(bytes, COLUMN_REF); + } + + /** + * Convert between an index and a bunch of values + * @throws Exception + */ + @Test + public void toFromIndexKey() throws Exception { + // start with empty values + byte[] indexKey = BLANK_INDEX_KEY; + List<byte[]> stored = CoveredColumnIndexCodec.getValues(indexKey); + assertEquals("Found some stored values in an index row key that wasn't created with values!", + 0, stored.size()); + + // a single, empty value + indexKey = EMPTY_INDEX_KEY; + stored = CoveredColumnIndexCodec.getValues(indexKey); + assertEquals("Found some stored values in an index row key that wasn't created with values!", + 1, stored.size()); + assertEquals("Found a non-zero length value: " + Bytes.toString(stored.get(0)), 0, + stored.get(0).length); + + // try with a couple values, some different lengths + byte[] v1 = new byte[] { 'a' }; + byte[] v2 = new byte[] { 'b' }; + byte[] v3 = Bytes.toBytes("v3"); + int len = v1.length + v2.length + v3.length; + indexKey = + CoveredColumnIndexCodec.composeRowKey(PK, len, + Arrays.asList(toColumnEntry(v1), toColumnEntry(v2), toColumnEntry(v3))); + stored = CoveredColumnIndexCodec.getValues(indexKey); + assertEquals("Didn't find expected number of values in index key!", 3, stored.size()); + assertTrue("First index keys don't match!", Bytes.equals(v1, stored.get(0))); + assertTrue("Second index keys don't match!", Bytes.equals(v2, stored.get(1))); + assertTrue("Third index keys don't match!", Bytes.equals(v3, stored.get(2))); + } + + /** + * Ensure that we correctly can determine when a row key is empty (no values). + */ + @Test + public void testCheckRowKeyForAllNulls() { + byte[] pk = new byte[] { 'a', 'b', 'z' }; + // check positive cases first + byte[] result = EMPTY_INDEX_KEY; + assertTrue("Didn't correctly read single element as being null in row key", + CoveredColumnIndexCodec.checkRowKeyForAllNulls(result)); + result = + CoveredColumnIndexCodec.composeRowKey(pk, 0, + Lists.newArrayList(toColumnEntry(new byte[0]), toColumnEntry(new byte[0]))); + assertTrue("Didn't correctly read two elements as being null in row key", + CoveredColumnIndexCodec.checkRowKeyForAllNulls(result)); + + // check cases where it isn't null + result = + CoveredColumnIndexCodec.composeRowKey(pk, 2, + Arrays.asList(toColumnEntry(new byte[] { 1, 2 }))); + assertFalse("Found a null key, when it wasn't!", + CoveredColumnIndexCodec.checkRowKeyForAllNulls(result)); + result = + CoveredColumnIndexCodec.composeRowKey(pk, 2, + Arrays.asList(toColumnEntry(new byte[] { 1, 2 }), toColumnEntry(new byte[0]))); + assertFalse("Found a null key, when it wasn't!", + CoveredColumnIndexCodec.checkRowKeyForAllNulls(result)); + } + + private static class SimpleTableState implements LocalHBaseState { + + private Result r; + + public SimpleTableState(Result r) { + this.r = r; + } + + @Override + public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly) + throws IOException { + return r; + } + + } + + /** + * Test that we get back the correct index updates for a given column group + * @throws Exception on failure + */ + @Test + public void testGeneratedIndexUpdates() throws Exception { + ColumnGroup group = new ColumnGroup("test-column-group"); + group.add(COLUMN_REF); + + final Result emptyState = Result.create(Collections.<Cell> emptyList()); + + // setup the state we expect for the codec + RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class); + Configuration conf = new Configuration(false); + Mockito.when(env.getConfiguration()).thenReturn(conf); + LocalHBaseState table = new SimpleTableState(emptyState); + + // make a new codec on those kvs + CoveredColumnIndexCodec codec = + CoveredColumnIndexCodec.getCodecForTesting(Arrays.asList(group)); + + // start with a basic put that has some keyvalues + Put p = new Put(PK); + // setup the kvs to add + List<KeyValue> kvs = new ArrayList<KeyValue>(); + byte[] v1 = Bytes.toBytes("v1"); + KeyValue kv = new KeyValue(PK, FAMILY, QUAL, 1, v1); + kvs.add(kv); + p.add(kv); + byte[] v2 = Bytes.toBytes("v2"); + kv = new KeyValue(PK, Bytes.toBytes("family2"), QUAL, 1, v2); + kvs.add(kv); + p.add(kv); + + // check the codec for deletes it should send + LocalTableState state = new LocalTableState(env, table, p); + Iterable<IndexUpdate> updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA); + assertFalse("Found index updates without any existing kvs in table!", updates.iterator().next() + .isValid()); + + // get the updates with the pending update + state.setCurrentTimestamp(1); + state.addPendingUpdates(kvs); + updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA); + assertTrue("Didn't find index updates for pending primary table update!", updates.iterator() + .hasNext()); + for (IndexUpdate update : updates) { + assertTrue("Update marked as invalid, but should be a pending index write!", update.isValid()); + Put m = (Put) update.getUpdate(); + // should just be the single update for the column reference + byte[] expected = + CoveredColumnIndexCodec.composeRowKey(PK, v1.length, Arrays.asList(toColumnEntry(v1))); + assertArrayEquals("Didn't get expected index value", expected, m.getRow()); + } + + // then apply a delete + Delete d = new Delete(PK, 2); + // need to set the timestamp here, as would actually happen on the server, unlike what happens + // with puts, where the get the constructor specified timestamp for unspecified methods. + d.deleteFamily(FAMILY, 2); + // setup the next batch of 'current state', basically just ripping out the current state from + // the last round + table = new SimpleTableState(new Result(kvs)); + state = new LocalTableState(env, table, d); + state.setCurrentTimestamp(2); + // check the cleanup of the current table, after the puts (mocking a 'next' update) + updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA); + for (IndexUpdate update : updates) { + assertTrue("Didn't have any index cleanup, even though there is current state", + update.isValid()); + Delete m = (Delete) update.getUpdate(); + // should just be the single update for the column reference + byte[] expected = + CoveredColumnIndexCodec.composeRowKey(PK, v1.length, Arrays.asList(toColumnEntry(v1))); + assertArrayEquals("Didn't get expected index value", expected, m.getRow()); + } + ensureNoUpdatesWhenCoveredByDelete(env, codec, kvs, d); + + // now with the delete of the columns + d = new Delete(PK, 2); + d.deleteColumns(FAMILY, QUAL, 2); + ensureNoUpdatesWhenCoveredByDelete(env, codec, kvs, d); + + // this delete needs to match timestamps exactly, by contract, to have any effect + d = new Delete(PK, 1); + d.deleteColumn(FAMILY, QUAL, 1); + ensureNoUpdatesWhenCoveredByDelete(env, codec, kvs, d); + } + + private void ensureNoUpdatesWhenCoveredByDelete(RegionCoprocessorEnvironment env, IndexCodec codec, List<KeyValue> currentState, + Delete d) throws IOException { + LocalHBaseState table = new SimpleTableState(new Result(currentState)); + LocalTableState state = new LocalTableState(env, table, d); + state.setCurrentTimestamp(d.getTimeStamp()); + // now we shouldn't see anything when getting the index update + state.addPendingUpdates(d.getFamilyMap().get(FAMILY)); + Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA); + for (IndexUpdate update : updates) { + assertFalse("Had some index updates, though it should have been covered by the delete", + update.isValid()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b67fa4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredIndexSpecifierBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredIndexSpecifierBuilder.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredIndexSpecifierBuilder.java new file mode 100644 index 0000000..d8f3ea5 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredIndexSpecifierBuilder.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.covered; + +import static org.junit.Assert.assertEquals; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +public class TestCoveredIndexSpecifierBuilder { + private static final String FAMILY = "FAMILY"; + private static final String FAMILY2 = "FAMILY2"; + private static final String INDEX_TABLE = "INDEX_TABLE"; + private static final String INDEX_TABLE2 = "INDEX_TABLE2"; + + + @Test + public void testSimpleSerialziationDeserialization() throws Exception { + byte[] indexed_qualifer = Bytes.toBytes("indexed_qual"); + + //setup the index + CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder(); + ColumnGroup fam1 = new ColumnGroup(INDEX_TABLE); + // match a single family:qualifier pair + CoveredColumn col1 = new CoveredColumn(FAMILY, indexed_qualifer); + fam1.add(col1); + // matches the family2:* columns + CoveredColumn col2 = new CoveredColumn(FAMILY2, null); + fam1.add(col2); + builder.addIndexGroup(fam1); + ColumnGroup fam2 = new ColumnGroup(INDEX_TABLE2); + // match a single family2:qualifier pair + CoveredColumn col3 = new CoveredColumn(FAMILY2, indexed_qualifer); + fam2.add(col3); + builder.addIndexGroup(fam2); + + Configuration conf = new Configuration(false); + //convert the map that HTableDescriptor gets into the conf the coprocessor receives + Map<String, String> map = builder.convertToMap(); + for(Entry<String, String> entry: map.entrySet()){ + conf.set(entry.getKey(), entry.getValue()); + } + + List<ColumnGroup> columns = CoveredColumnIndexSpecifierBuilder.getColumns(conf); + assertEquals("Didn't deserialize the expected number of column groups", 2, columns.size()); + ColumnGroup group = columns.get(0); + assertEquals("Didn't deserialize expected column in first group", col1, group.getColumnForTesting(0)); + assertEquals("Didn't deserialize expected column in first group", col2, group.getColumnForTesting(1)); + group = columns.get(1); + assertEquals("Didn't deserialize expected column in second group", col3, group.getColumnForTesting(0)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b67fa4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestColumnTracker.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestColumnTracker.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestColumnTracker.java deleted file mode 100644 index 02555e9..0000000 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestColumnTracker.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.hbase.index.covered.example; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Collection; - -import org.junit.Test; - -import org.apache.phoenix.hbase.index.covered.update.ColumnReference; -import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; - -public class TestColumnTracker { - - @Test - public void testEnsureGuarranteedMinValid() { - assertFalse("Guarranted min wasn't recognized as having newer timestamps!", - ColumnTracker.isNewestTime(ColumnTracker.GUARANTEED_NEWER_UPDATES)); - } - - @Test - public void testOnlyKeepsOlderTimestamps() { - Collection<ColumnReference> columns = new ArrayList<ColumnReference>(); - ColumnTracker tracker = new ColumnTracker(columns); - tracker.setTs(10); - assertEquals("Column tracker didn't set original TS", 10, tracker.getTS()); - tracker.setTs(12); - assertEquals("Column tracker allowed newer timestamp to be set.", 10, tracker.getTS()); - tracker.setTs(9); - assertEquals("Column tracker didn't decrease set timestamp for smaller value", 9, - tracker.getTS()); - } - - @Test - public void testHasNewerTimestamps() throws Exception { - Collection<ColumnReference> columns = new ArrayList<ColumnReference>(); - ColumnTracker tracker = new ColumnTracker(columns); - assertFalse("Tracker has newer timestamps when no ts set", tracker.hasNewerTimestamps()); - tracker.setTs(10); - assertTrue("Tracker doesn't have newer timetamps with set ts", tracker.hasNewerTimestamps()); - } -} \ No newline at end of file