Repository: phoenix Updated Branches: refs/heads/master b36bb31fe -> a0ae8025e
PHOENIX-3148 Reduce size of PTable so that more tables can be cached in the metada cache Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a0ae8025 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a0ae8025 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a0ae8025 Branch: refs/heads/master Commit: a0ae8025eccbd76b4277f74e57081ddbb5a7babe Parents: b36bb31 Author: Thomas D'Silva <tdsi...@salesforce.com> Authored: Fri Aug 19 16:39:28 2016 -0700 Committer: Thomas D'Silva <tdsi...@salesforce.com> Committed: Tue Aug 23 23:00:45 2016 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/IndexToolIT.java | 1 - .../apache/phoenix/execute/MutationState.java | 6 +- .../query/ConnectionQueryServicesImpl.java | 5 +- .../query/ConnectionlessQueryServicesImpl.java | 4 +- .../org/apache/phoenix/query/QueryServices.java | 2 + .../phoenix/query/QueryServicesOptions.java | 3 + .../apache/phoenix/schema/MetaDataClient.java | 25 +- .../apache/phoenix/schema/PMetaDataCache.java | 221 +++++++++++++++ .../apache/phoenix/schema/PMetaDataImpl.java | 268 +++---------------- .../org/apache/phoenix/schema/PTableImpl.java | 29 +- .../org/apache/phoenix/schema/PTableRef.java | 56 ++-- .../apache/phoenix/schema/PTableRefFactory.java | 52 ++++ .../apache/phoenix/schema/PTableRefImpl.java | 39 +++ .../phoenix/schema/SerializedPTableRef.java | 47 ++++ .../schema/SerializedPTableRefFactory.java | 37 +++ .../phoenix/schema/PMetaDataImplTest.java | 34 ++- 16 files changed, 532 insertions(+), 297 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index c66fea3..16db876 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -37,7 +37,6 @@ import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.mapreduce.index.IndexTool; -import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.util.PropertiesUtil; http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 7a9282c..e7e6aa7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -680,7 +680,7 @@ public class MutationState implements SQLCloseable { return Iterators.emptyIterator(); } Long scn = connection.getSCN(); - final long timestamp = (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn); + final long timestamp = getMutationTimestamp(tableTimestamp, scn); return new Iterator<Pair<byte[],List<Mutation>>>() { private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next(); private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init(); @@ -726,6 +726,10 @@ public class MutationState implements SQLCloseable { }; } + + public static long getMutationTimestamp(final Long tableTimestamp, Long scn) { + return (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn); + } /** * Validates that the meta data is valid against the server meta data if we haven't yet done so. http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 7a57103..524067d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -32,6 +32,7 @@ import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS; +import static org.apache.phoenix.util.UpgradeUtil.getUpgradeSnapshotName; import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0; import java.io.IOException; @@ -288,9 +289,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement }); private PMetaData newEmptyMetaData() { - long maxSizeBytes = props.getLong(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, - QueryServicesOptions.DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE); - return new PSynchronizedMetaData(new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, maxSizeBytes)); + return new PSynchronizedMetaData(new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, getProps())); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 25aca74..560b5d9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -144,9 +144,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple } private PMetaData newEmptyMetaData() { - long maxSizeBytes = getProps().getLong(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, - QueryServicesOptions.DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE); - return new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, maxSizeBytes); + return new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, getProps()); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 22fa45a..d7c7c62 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -223,6 +223,8 @@ public interface QueryServices extends SQLCloseable { public static final String LIMITED_QUERY_SERIAL_THRESHOLD = "phoenix.limited.query.serial.threshold"; public static final String INDEX_ASYNC_BUILD_ENABLED = "phoenix.index.async.build.enabled"; + + public static final String CLIENT_CACHE_ENCODING = "phoenix.table.client.cache.encoding"; /** * Get executor service used for parallel scans */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 83347c8..d874860 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -90,6 +90,7 @@ import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.phoenix.schema.PTableRefFactory; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -255,6 +256,8 @@ public class QueryServicesOptions { public static final float DEFAULT_LIMITED_QUERY_SERIAL_THRESHOLD = 0.2f; public static final boolean DEFAULT_INDEX_ASYNC_BUILD_ENABLED = true; + + public static final String DEFAULT_CLIENT_CACHE_ENCODING = PTableRefFactory.Encoding.OBJECT.toString(); @SuppressWarnings("serial") public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 7f97f4a..efe60ac 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -3147,7 +3147,7 @@ public class MetaDataClient { } } - private String dropColumnMutations(PTable table, List<PColumn> columnsToDrop, List<Mutation> tableMetaData) throws SQLException { + private String dropColumnMutations(PTable table, List<PColumn> columnsToDrop) throws SQLException { String tenantId = connection.getTenantId() == null ? "" : connection.getTenantId().getString(); String schemaName = table.getSchemaName().getString(); String tableName = table.getTableName().getString(); @@ -3263,7 +3263,9 @@ public class MetaDataClient { columnsToDrop.add(new ColumnRef(columnRef.getTableRef(), columnToDrop.getPosition())); } - dropColumnMutations(table, tableColumnsToDrop, tableMetaData); + dropColumnMutations(table, tableColumnsToDrop); + boolean removedIndexTableOrColumn=false; + Long timeStamp = table.isTransactional() ? tableRef.getTimeStamp() : null; for (PTable index : table.getIndexes()) { IndexMaintainer indexMaintainer = index.getIndexMaintainer(table, connection); // get the columns required for the index pk @@ -3278,6 +3280,7 @@ public class MetaDataClient { if (index.getViewIndexId()==null) indexesToDrop.add(new TableRef(index)); connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, index.getName().getString()), index.getParentName() == null ? null : index.getParentName().getString(), index.getTimeStamp()); + removedIndexTableOrColumn = true; } else if (coveredColumns.contains(columnToDropRef)) { String indexColumnName = IndexUtil.getIndexColumnName(columnToDrop); @@ -3285,15 +3288,18 @@ public class MetaDataClient { indexColumnsToDrop.add(indexColumn); // add the index column to be dropped so that we actually delete the column values columnsToDrop.add(new ColumnRef(new TableRef(index), indexColumn.getPosition())); + removedIndexTableOrColumn = true; } } if(!indexColumnsToDrop.isEmpty()) { - incrementTableSeqNum(index, index.getType(), -indexColumnsToDrop.size(), null, null); - dropColumnMutations(index, indexColumnsToDrop, tableMetaData); + long indexTableSeqNum = incrementTableSeqNum(index, index.getType(), -indexColumnsToDrop.size(), null, null); + dropColumnMutations(index, indexColumnsToDrop); + long clientTimestamp = MutationState.getMutationTimestamp(timeStamp, connection.getSCN()); + connection.removeColumn(tenantId, index.getName().getString(), + indexColumnsToDrop, clientTimestamp, indexTableSeqNum, + TransactionUtil.getResolvedTimestamp(connection, index.isTransactional(), clientTimestamp)); } - } - Long timeStamp = table.isTransactional() ? tableRef.getTimeStamp() : null; tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond()); connection.rollback(); @@ -3348,8 +3354,11 @@ public class MetaDataClient { // If we've done any index metadata updates, don't bother trying to update // client-side cache as it would be too painful. Just let it pull it over from // the server when needed. - if (tableColumnsToDrop.size() > 0 && indexesToDrop.isEmpty()) { - connection.removeColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName) , tableColumnsToDrop, result.getMutationTime(), seqNum, TransactionUtil.getResolvedTime(connection, result)); + if (tableColumnsToDrop.size() > 0) { + if (removedIndexTableOrColumn) + connection.removeTable(tenantId, tableName, table.getParentName() == null ? null : table.getParentName().getString(), table.getTimeStamp()); + else + connection.removeColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName) , tableColumnsToDrop, result.getMutationTime(), seqNum, TransactionUtil.getResolvedTime(connection, result)); } // If we have a VIEW, then only delete the metadata, and leave the table data alone if (table.getType() != PTableType.VIEW) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataCache.java new file mode 100644 index 0000000..9992adb --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataCache.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; + +import org.apache.phoenix.parse.PFunction; +import org.apache.phoenix.parse.PSchema; +import org.apache.phoenix.util.TimeKeeper; + +import com.google.common.collect.Maps; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.primitives.Longs; + +class PMetaDataCache implements Cloneable { + private static final int MIN_REMOVAL_SIZE = 3; + private static final Comparator<PTableRef> COMPARATOR = new Comparator<PTableRef>() { + @Override + public int compare(PTableRef tableRef1, PTableRef tableRef2) { + return Longs.compare(tableRef1.getLastAccessTime(), tableRef2.getLastAccessTime()); + } + }; + private static final MinMaxPriorityQueue.Builder<PTableRef> BUILDER = MinMaxPriorityQueue.orderedBy(COMPARATOR); + + private long currentByteSize; + private final long maxByteSize; + private final int expectedCapacity; + private final TimeKeeper timeKeeper; + private final PTableRefFactory tableRefFactory; + + private final Map<PTableKey,PTableRef> tables; + final Map<PTableKey,PFunction> functions; + final Map<PTableKey,PSchema> schemas; + + private static Map<PTableKey,PTableRef> newMap(int expectedCapacity) { + // Use regular HashMap, as we cannot use a LinkedHashMap that orders by access time + // safely across multiple threads (as the underlying collection is not thread safe). + // Instead, we track access time and prune it based on the copy we've made. + return Maps.newHashMapWithExpectedSize(expectedCapacity); + } + + private static Map<PTableKey,PFunction> newFunctionMap(int expectedCapacity) { + // Use regular HashMap, as we cannot use a LinkedHashMap that orders by access time + // safely across multiple threads (as the underlying collection is not thread safe). + // Instead, we track access time and prune it based on the copy we've made. + return Maps.newHashMapWithExpectedSize(expectedCapacity); + } + + private static Map<PTableKey,PSchema> newSchemaMap(int expectedCapacity) { + // Use regular HashMap, as we cannot use a LinkedHashMap that orders by access time + // safely across multiple threads (as the underlying collection is not thread safe). + // Instead, we track access time and prune it based on the copy we've made. + return Maps.newHashMapWithExpectedSize(expectedCapacity); + } + + private Map<PTableKey,PTableRef> cloneMap(Map<PTableKey,PTableRef> tables, int expectedCapacity) { + Map<PTableKey,PTableRef> newTables = newMap(Math.max(tables.size(),expectedCapacity)); + // Copy value so that access time isn't changing anymore + for (PTableRef tableAccess : tables.values()) { + newTables.put(tableAccess.getTable().getKey(), tableRefFactory.makePTableRef(tableAccess)); + } + return newTables; + } + + private static Map<PTableKey, PSchema> cloneSchemaMap(Map<PTableKey, PSchema> schemas, int expectedCapacity) { + Map<PTableKey, PSchema> newSchemas = newSchemaMap(Math.max(schemas.size(), expectedCapacity)); + // Copy value so that access time isn't changing anymore + for (PSchema schema : schemas.values()) { + newSchemas.put(schema.getSchemaKey(), new PSchema(schema)); + } + return newSchemas; + } + + private static Map<PTableKey,PFunction> cloneFunctionsMap(Map<PTableKey,PFunction> functions, int expectedCapacity) { + Map<PTableKey,PFunction> newFunctions = newFunctionMap(Math.max(functions.size(),expectedCapacity)); + for (PFunction functionAccess : functions.values()) { + newFunctions.put(functionAccess.getKey(), new PFunction(functionAccess)); + } + return newFunctions; + } + + PMetaDataCache(PMetaDataCache toClone) { + this.tableRefFactory = toClone.tableRefFactory; + this.timeKeeper = toClone.timeKeeper; + this.maxByteSize = toClone.maxByteSize; + this.currentByteSize = toClone.currentByteSize; + this.expectedCapacity = toClone.expectedCapacity; + this.tables = cloneMap(toClone.tables, expectedCapacity); + this.functions = cloneFunctionsMap(toClone.functions, expectedCapacity); + this.schemas = cloneSchemaMap(toClone.schemas, expectedCapacity); + } + + public PMetaDataCache(int initialCapacity, long maxByteSize, TimeKeeper timeKeeper, PTableRefFactory tableRefFactory) { + this.currentByteSize = 0; + this.maxByteSize = maxByteSize; + this.expectedCapacity = initialCapacity; + this.tables = newMap(this.expectedCapacity); + this.functions = newFunctionMap(this.expectedCapacity); + this.timeKeeper = timeKeeper; + this.schemas = newSchemaMap(this.expectedCapacity); + this.tableRefFactory = tableRefFactory; + } + + public PTableRef get(PTableKey key) { + PTableRef tableAccess = this.tables.get(key); + if (tableAccess == null) { + return null; + } + tableAccess.setLastAccessTime(timeKeeper.getCurrentTime()); + return tableAccess; + } + + @Override + public PMetaDataCache clone() { + return new PMetaDataCache(this); + } + + /** + * Used when the cache is growing past its max size to clone in a single pass. + * Removes least recently used tables to get size of cache below its max size by + * the overage amount. + */ + public PMetaDataCache cloneMinusOverage(long overage) { + assert(overage > 0); + int nToRemove = Math.max(MIN_REMOVAL_SIZE, (int)Math.ceil((currentByteSize-maxByteSize) / ((double)currentByteSize / size())) + 1); + MinMaxPriorityQueue<PTableRef> toRemove = BUILDER.expectedSize(nToRemove).create(); + PMetaDataCache newCache = new PMetaDataCache(this.size(), this.maxByteSize, this.timeKeeper, this.tableRefFactory); + + long toRemoveBytes = 0; + // Add to new cache, but track references to remove when done + // to bring cache at least overage amount below it's max size. + for (PTableRef tableRef : this.tables.values()) { + newCache.put(tableRef.getTable().getKey(), tableRefFactory.makePTableRef(tableRef)); + toRemove.add(tableRef); + toRemoveBytes += tableRef.getEstimatedSize(); + while (toRemoveBytes - toRemove.peekLast().getEstimatedSize() >= overage) { + PTableRef removedRef = toRemove.removeLast(); + toRemoveBytes -= removedRef.getEstimatedSize(); + } + } + for (PTableRef toRemoveRef : toRemove) { + newCache.remove(toRemoveRef.getTable().getKey()); + } + return newCache; + } + + PTable put(PTableKey key, PTableRef ref) { + currentByteSize += ref.getEstimatedSize(); + PTableRef oldTableAccess = this.tables.put(key, ref); + PTable oldTable = null; + if (oldTableAccess != null) { + currentByteSize -= oldTableAccess.getEstimatedSize(); + oldTable = oldTableAccess.getTable(); + } + return oldTable; + } + + public long getAge(PTableRef ref) { + return timeKeeper.getCurrentTime() - ref.getCreateTime(); + } + + public PTable remove(PTableKey key) { + PTableRef value = this.tables.remove(key); + if (value == null) { + return null; + } + currentByteSize -= value.getEstimatedSize(); + return value.getTable(); + } + + public Iterator<PTable> iterator() { + final Iterator<PTableRef> iterator = this.tables.values().iterator(); + return new Iterator<PTable>() { + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public PTable next() { + return iterator.next().getTable(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } + + public int size() { + return this.tables.size(); + } + + public long getCurrentSize() { + return this.currentByteSize; + } + + public long getMaxSize() { + return this.maxByteSize; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java index 5ffacca..7a78006 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java @@ -18,253 +18,50 @@ package org.apache.phoenix.schema; import java.sql.SQLException; -import java.util.Comparator; import java.util.Iterator; import java.util.List; -import java.util.Map; import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TimeKeeper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.MinMaxPriorityQueue; -import com.google.common.primitives.Longs; /** - * - * Client-side cache of MetaData. Not thread safe, but meant to be used - * in a copy-on-write fashion. Internally uses a LinkedHashMap that evicts - * the oldest entries when size grows beyond the maxSize specified at - * create time. - * + * Client-side cache of MetaData, not thread safe. Internally uses a LinkedHashMap that evicts the + * oldest entries when size grows beyond the maxSize specified at create time. */ public class PMetaDataImpl implements PMetaData { - private static final Logger logger = LoggerFactory.getLogger(PMetaDataImpl.class); - static class PMetaDataCache implements Cloneable { - private static final int MIN_REMOVAL_SIZE = 3; - private static final Comparator<PTableRef> COMPARATOR = new Comparator<PTableRef>() { - @Override - public int compare(PTableRef tableRef1, PTableRef tableRef2) { - return Longs.compare(tableRef1.getLastAccessTime(), tableRef2.getLastAccessTime()); - } - }; - private static final MinMaxPriorityQueue.Builder<PTableRef> BUILDER = MinMaxPriorityQueue.orderedBy(COMPARATOR); - - private long currentByteSize; - private final long maxByteSize; - private final int expectedCapacity; - private final TimeKeeper timeKeeper; - - private final Map<PTableKey,PTableRef> tables; - private final Map<PTableKey,PFunction> functions; - private final Map<PTableKey,PSchema> schemas; - - private static Map<PTableKey,PTableRef> newMap(int expectedCapacity) { - // Use regular HashMap, as we cannot use a LinkedHashMap that orders by access time - // safely across multiple threads (as the underlying collection is not thread safe). - // Instead, we track access time and prune it based on the copy we've made. - return Maps.newHashMapWithExpectedSize(expectedCapacity); - } - - private static Map<PTableKey,PFunction> newFunctionMap(int expectedCapacity) { - // Use regular HashMap, as we cannot use a LinkedHashMap that orders by access time - // safely across multiple threads (as the underlying collection is not thread safe). - // Instead, we track access time and prune it based on the copy we've made. - return Maps.newHashMapWithExpectedSize(expectedCapacity); - } - - private static Map<PTableKey,PSchema> newSchemaMap(int expectedCapacity) { - // Use regular HashMap, as we cannot use a LinkedHashMap that orders by access time - // safely across multiple threads (as the underlying collection is not thread safe). - // Instead, we track access time and prune it based on the copy we've made. - return Maps.newHashMapWithExpectedSize(expectedCapacity); - } - - private static Map<PTableKey,PTableRef> cloneMap(Map<PTableKey,PTableRef> tables, int expectedCapacity) { - Map<PTableKey,PTableRef> newTables = newMap(Math.max(tables.size(),expectedCapacity)); - // Copy value so that access time isn't changing anymore - for (PTableRef tableAccess : tables.values()) { - newTables.put(tableAccess.getTable().getKey(), new PTableRef(tableAccess)); - } - return newTables; - } - - private static Map<PTableKey, PSchema> cloneSchemaMap(Map<PTableKey, PSchema> schemas, int expectedCapacity) { - Map<PTableKey, PSchema> newSchemas = newSchemaMap(Math.max(schemas.size(), expectedCapacity)); - // Copy value so that access time isn't changing anymore - for (PSchema schema : schemas.values()) { - newSchemas.put(schema.getSchemaKey(), new PSchema(schema)); - } - return newSchemas; - } - - private static Map<PTableKey,PFunction> cloneFunctionsMap(Map<PTableKey,PFunction> functions, int expectedCapacity) { - Map<PTableKey,PFunction> newFunctions = newFunctionMap(Math.max(functions.size(),expectedCapacity)); - for (PFunction functionAccess : functions.values()) { - newFunctions.put(functionAccess.getKey(), new PFunction(functionAccess)); - } - return newFunctions; - } - - private PMetaDataCache(PMetaDataCache toClone) { - this.timeKeeper = toClone.timeKeeper; - this.maxByteSize = toClone.maxByteSize; - this.currentByteSize = toClone.currentByteSize; - this.expectedCapacity = toClone.expectedCapacity; - this.tables = cloneMap(toClone.tables, expectedCapacity); - this.functions = cloneFunctionsMap(toClone.functions, expectedCapacity); - this.schemas = cloneSchemaMap(toClone.schemas, expectedCapacity); - } - - public PMetaDataCache(int initialCapacity, long maxByteSize, TimeKeeper timeKeeper) { - this.currentByteSize = 0; - this.maxByteSize = maxByteSize; - this.expectedCapacity = initialCapacity; - this.tables = newMap(this.expectedCapacity); - this.functions = newFunctionMap(this.expectedCapacity); - this.timeKeeper = timeKeeper; - this.schemas = newSchemaMap(this.expectedCapacity); - } - - public PTableRef get(PTableKey key) { - PTableRef tableAccess = this.tables.get(key); - if (tableAccess == null) { - return null; - } - tableAccess.setLastAccessTime(timeKeeper.getCurrentTime()); - return tableAccess; - } - - @Override - public PMetaDataCache clone() { - return new PMetaDataCache(this); - } - - /** - * Used when the cache is growing past its max size to clone in a single pass. - * Removes least recently used tables to get size of cache below its max size by - * the overage amount. - */ - public PMetaDataCache cloneMinusOverage(long overage) { - assert(overage > 0); - int nToRemove = Math.max(MIN_REMOVAL_SIZE, (int)Math.ceil((currentByteSize-maxByteSize) / ((double)currentByteSize / size())) + 1); - MinMaxPriorityQueue<PTableRef> toRemove = BUILDER.expectedSize(nToRemove).create(); - PMetaDataCache newCache = new PMetaDataCache(this.size(), this.maxByteSize, this.timeKeeper); - - long toRemoveBytes = 0; - // Add to new cache, but track references to remove when done - // to bring cache at least overage amount below it's max size. - for (PTableRef tableRef : this.tables.values()) { - newCache.put(tableRef.getTable().getKey(), new PTableRef(tableRef)); - toRemove.add(tableRef); - toRemoveBytes += tableRef.getEstSize(); - while (toRemoveBytes - toRemove.peekLast().getEstSize() >= overage) { - PTableRef removedRef = toRemove.removeLast(); - toRemoveBytes -= removedRef.getEstSize(); - } - } - for (PTableRef toRemoveRef : toRemove) { - newCache.remove(toRemoveRef.getTable().getKey()); - } - return newCache; - } - - private PTable put(PTableKey key, PTableRef ref) { - currentByteSize += ref.getEstSize(); - PTableRef oldTableAccess = this.tables.put(key, ref); - PTable oldTable = null; - if (oldTableAccess != null) { - currentByteSize -= oldTableAccess.getEstSize(); - oldTable = oldTableAccess.getTable(); - } - return oldTable; - } - - public PTable put(PTableKey key, PTable value, long resolvedTime) { - return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), resolvedTime)); - } - - public PTable putDuplicate(PTableKey key, PTable value, long resolvedTime) { - return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), 0, resolvedTime)); - } - - public long getAge(PTableRef ref) { - return timeKeeper.getCurrentTime() - ref.getCreateTime(); - } - - public PTable remove(PTableKey key) { - PTableRef value = this.tables.remove(key); - if (value == null) { - return null; - } - currentByteSize -= value.getEstSize(); - return value.getTable(); - } - - public Iterator<PTable> iterator() { - final Iterator<PTableRef> iterator = this.tables.values().iterator(); - return new Iterator<PTable>() { - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public PTable next() { - return iterator.next().getTable(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - }; - } - - public int size() { - return this.tables.size(); - } - - public long getCurrentSize() { - return this.currentByteSize; - } - - public long getMaxSize() { - return this.maxByteSize; - } - } - - private PMetaDataCache metaData; - @VisibleForTesting - public PMetaDataCache getMetaData() { - return metaData; - } + private PMetaDataCache metaData; + private final TimeKeeper timeKeeper; + private final PTableRefFactory tableRefFactory; - public PMetaDataImpl(int initialCapacity, long maxByteSize) { - this.metaData = new PMetaDataCache(initialCapacity, maxByteSize, TimeKeeper.SYSTEM); + public PMetaDataImpl(int initialCapacity, ReadOnlyProps props) { + this(initialCapacity, TimeKeeper.SYSTEM, props); } - public PMetaDataImpl(int initialCapacity, long maxByteSize, TimeKeeper timeKeeper) { - this.metaData = new PMetaDataCache(initialCapacity, maxByteSize, timeKeeper); + public PMetaDataImpl(int initialCapacity, TimeKeeper timeKeeper, ReadOnlyProps props) { + this(new PMetaDataCache(initialCapacity, props.getLong( + QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE), timeKeeper, + PTableRefFactory.getFactory(props)), timeKeeper, PTableRefFactory.getFactory(props)); } - private PMetaDataImpl(PMetaDataCache metaData) { + private PMetaDataImpl(PMetaDataCache metaData, TimeKeeper timeKeeper, PTableRefFactory tableRefFactory) { + this.timeKeeper = timeKeeper; this.metaData = metaData; + this.tableRefFactory = tableRefFactory; } - + @Override public PMetaDataImpl clone() { - return new PMetaDataImpl(new PMetaDataCache(this.metaData)); + return new PMetaDataImpl(new PMetaDataCache(this.metaData), this.timeKeeper, this.tableRefFactory); } @Override @@ -292,18 +89,20 @@ public class PMetaDataImpl implements PMetaData { @Override public void updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException { - metaData.putDuplicate(table.getKey(), table, resolvedTimestamp); + metaData.put(table.getKey(), tableRefFactory.makePTableRef(table, this.timeKeeper.getCurrentTime(), resolvedTimestamp)); } @Override public void addTable(PTable table, long resolvedTime) throws SQLException { + PTableRef tableRef = tableRefFactory.makePTableRef(table, this.timeKeeper.getCurrentTime(), resolvedTime); int netGain = 0; PTableKey key = table.getKey(); PTableRef oldTableRef = metaData.get(key); if (oldTableRef != null) { - netGain -= oldTableRef.getEstSize(); + netGain -= oldTableRef.getEstimatedSize(); } PTable newParentTable = null; + PTableRef newParentTableRef = null; long parentResolvedTimestamp = resolvedTime; if (table.getParentName() != null) { // Upsert new index table into parent data table list String parentName = table.getParentName().getString(); @@ -321,25 +120,26 @@ public class PMetaDataImpl implements PMetaData { } } newIndexes.add(table); - netGain -= oldParentRef.getEstSize(); + netGain -= oldParentRef.getEstimatedSize(); newParentTable = PTableImpl.makePTable(oldParentRef.getTable(), table.getTimeStamp(), newIndexes); - netGain += newParentTable.getEstimatedSize(); + newParentTableRef = tableRefFactory.makePTableRef(newParentTable, this.timeKeeper.getCurrentTime(), parentResolvedTimestamp); + netGain += newParentTableRef.getEstimatedSize(); } } if (newParentTable == null) { // Don't count in gain if we found a parent table, as its accounted for in newParentTable - netGain += table.getEstimatedSize(); + netGain += tableRef.getEstimatedSize(); } long overage = metaData.getCurrentSize() + netGain - metaData.getMaxSize(); metaData = overage <= 0 ? metaData : metaData.cloneMinusOverage(overage); if (newParentTable != null) { // Upsert new index table into parent data table list - metaData.put(newParentTable.getKey(), newParentTable, parentResolvedTimestamp); - metaData.putDuplicate(table.getKey(), table, resolvedTime); + metaData.put(newParentTable.getKey(), newParentTableRef); + metaData.put(table.getKey(), tableRef); } else { - metaData.put(table.getKey(), table, resolvedTime); + metaData.put(table.getKey(), tableRef); } for (PTable index : table.getIndexes()) { - metaData.putDuplicate(index.getKey(), index, resolvedTime); + metaData.put(index.getKey(), tableRefFactory.makePTableRef(index, this.timeKeeper.getCurrentTime(), resolvedTime)); } } @@ -401,7 +201,7 @@ public class PMetaDataImpl implements PMetaData { parentTableRef.getTable(), tableTimeStamp == HConstants.LATEST_TIMESTAMP ? parentTableRef.getTable().getTimeStamp() : tableTimeStamp, newIndexes); - metaData.put(parentTable.getKey(), parentTable, parentTableRef.getResolvedTimeStamp()); + metaData.put(parentTable.getKey(), tableRefFactory.makePTableRef(parentTable, this.timeKeeper.getCurrentTime(), parentTableRef.getResolvedTimeStamp())); break; } } @@ -444,7 +244,7 @@ public class PMetaDataImpl implements PMetaData { table = PTableImpl.makePTable(table, tableTimeStamp, tableSeqNum, columns); } - tables.put(table.getKey(), table, resolvedTime); + tables.put(table.getKey(), tableRefFactory.makePTableRef(table, this.timeKeeper.getCurrentTime(), resolvedTime)); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index 92c49f9..c485a30 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -77,7 +77,6 @@ import com.google.common.collect.Maps; * storing data in a single column (ColumnLayout.SINGLE) or in * multiple columns (ColumnLayout.MULTI). * - * TODO add hashCode and equal methods to check equality of two PTableImpl objects. * @since 0.1 */ public class PTableImpl implements PTable { @@ -1073,9 +1072,9 @@ public class PTableImpl implements PTable { List<PName> physicalNames = Collections.emptyList(); if (tableType == PTableType.VIEW) { viewType = ViewType.fromSerializedValue(table.getViewType().toByteArray()[0]); - if(table.hasViewStatement()){ - viewStatement = (String) PVarchar.INSTANCE.toObject(table.getViewStatement().toByteArray()); - } + } + if(table.hasViewStatement()){ + viewStatement = (String) PVarchar.INSTANCE.toObject(table.getViewStatement().toByteArray()); } if (tableType == PTableType.VIEW || viewIndexId != null) { physicalNames = Lists.newArrayListWithExpectedSize(table.getPhysicalNamesCount()); @@ -1181,6 +1180,8 @@ public class PTableImpl implements PTable { builder.setTransactional(table.isTransactional()); if(table.getType() == PTableType.VIEW){ builder.setViewType(ByteStringer.wrap(new byte[]{table.getViewType().getSerializedValue()})); + } + if(table.getViewStatement()!=null){ builder.setViewStatement(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getViewStatement()))); } if(table.getType() == PTableType.VIEW || table.getViewIndexId() != null){ @@ -1244,4 +1245,24 @@ public class PTableImpl implements PTable { public boolean isAppendOnlySchema() { return isAppendOnlySchema; } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((key == null) ? 0 : key.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + PTableImpl other = (PTableImpl) obj; + if (key == null) { + if (other.key != null) return false; + } else if (!key.equals(other.key)) return false; + return true; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java index c4bc510..0a601b0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java @@ -17,28 +17,19 @@ */ package org.apache.phoenix.schema; -public class PTableRef { - private final PTable table; - private final int estSize; - private final long createTime; - private final long resolvedTimeStamp; - private volatile long lastAccessTime; +public abstract class PTableRef { - public PTableRef(PTable table, long lastAccessTime, int estSize, long resolvedTime) { - this.table = table; + protected final int estSize; + protected final long createTime; + protected final long resolvedTimeStamp; + protected volatile long lastAccessTime; + + public PTableRef(long lastAccessTime, long resolvedTime, int estimatedSize) { this.lastAccessTime = lastAccessTime; - this.estSize = estSize; + this.estSize = estimatedSize; this.resolvedTimeStamp = resolvedTime; this.createTime = lastAccessTime; } - - public PTableRef(PTable table, long lastAccessTime, long resolvedTime) { - this (table, lastAccessTime, table.getEstimatedSize(), resolvedTime); - } - - public PTableRef(PTableRef tableRef) { - this (tableRef.table, tableRef.lastAccessTime, tableRef.estSize, tableRef.resolvedTimeStamp); - } /** * Tracks how long this entry has been in the cache @@ -48,23 +39,22 @@ public class PTableRef { return createTime; } - public PTable getTable() { - return table; - } + public abstract PTable getTable(); - public long getResolvedTimeStamp() { - return resolvedTimeStamp; - } - - public int getEstSize() { - return estSize; - } + public long getResolvedTimeStamp() { + return resolvedTimeStamp; + } + + public int getEstimatedSize() { + return estSize; + } - public long getLastAccessTime() { - return lastAccessTime; - } + public long getLastAccessTime() { + return lastAccessTime; + } - public void setLastAccessTime(long lastAccessTime) { - this.lastAccessTime = lastAccessTime; - } + public void setLastAccessTime(long lastAccessTime) { + this.lastAccessTime = lastAccessTime; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefFactory.java new file mode 100644 index 0000000..14eb235 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema; + +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.ReadOnlyProps; + +public class PTableRefFactory { + public PTableRef makePTableRef(PTable table, long lastAccessTime, long resolvedTime) { + return new PTableRefImpl(table, lastAccessTime, resolvedTime, table.getEstimatedSize()); + } + + public PTableRef makePTableRef(PTableRef tableRef) { + return new PTableRefImpl(tableRef); + } + + private static final PTableRefFactory INSTANCE = new PTableRefFactory(); + + public static enum Encoding { + OBJECT, PROTOBUF + }; + + public static PTableRefFactory getFactory(ReadOnlyProps props) { + String encodingEnumString = + props.get(QueryServices.CLIENT_CACHE_ENCODING, + QueryServicesOptions.DEFAULT_CLIENT_CACHE_ENCODING); + Encoding encoding = Encoding.valueOf(encodingEnumString.toUpperCase()); + switch (encoding) { + case PROTOBUF: + return SerializedPTableRefFactory.getFactory(); + case OBJECT: + default: + return INSTANCE; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefImpl.java new file mode 100644 index 0000000..ffc5c2b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefImpl.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema; + + +public class PTableRefImpl extends PTableRef { + + private final PTable table; + + public PTableRefImpl(PTable table, long lastAccessTime, long resolvedTime, int estimatedSize) { + super(lastAccessTime, resolvedTime, estimatedSize); + this.table = table; + } + + public PTableRefImpl(PTableRef tableRef) { + super(tableRef.getLastAccessTime(), tableRef.getResolvedTimeStamp(), tableRef.getEstimatedSize()); + this.table = tableRef.getTable(); + } + + @Override + public PTable getTable() { + return table; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRef.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRef.java new file mode 100644 index 0000000..a57fc72 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRef.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema; + +import java.io.IOException; + +import org.apache.phoenix.coprocessor.generated.PTableProtos; + +public class SerializedPTableRef extends PTableRef { + + private final byte[] tableBytes; + + public SerializedPTableRef(byte[] tableBytes, long lastAccessTime, long resolvedTime, int estimatedSize) { + super(lastAccessTime, resolvedTime, tableBytes.length); + this.tableBytes = tableBytes; + } + + public SerializedPTableRef(PTableRef tableRef) { + super(tableRef.getLastAccessTime(), tableRef.getResolvedTimeStamp(), tableRef.getEstimatedSize()); + this.tableBytes = ((SerializedPTableRef)tableRef).tableBytes; + } + + @Override + public PTable getTable() { + try { + return PTableImpl.createFromProto(PTableProtos.PTable.parseFrom(tableBytes)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRefFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRefFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRefFactory.java new file mode 100644 index 0000000..5da1fd6 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRefFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema; + +class SerializedPTableRefFactory extends PTableRefFactory { + @Override + public PTableRef makePTableRef(PTable table, long lastAccessTime, long resolvedTime) { + byte[] serializedBytes = PTableImpl.toProto(table).toByteArray(); + return new SerializedPTableRef(serializedBytes, lastAccessTime, resolvedTime, table.getEstimatedSize()); + } + + @Override + public PTableRef makePTableRef(PTableRef tableRef) { + return new SerializedPTableRef(tableRef); + } + + private static final SerializedPTableRefFactory INSTANCE = new SerializedPTableRefFactory(); + + public static PTableRefFactory getFactory() { + return INSTANCE; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java index ef88c8c..a5660db 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java @@ -21,12 +21,16 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import java.sql.SQLException; +import java.util.Map; import java.util.Set; import org.apache.hadoop.hbase.HConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.TimeKeeper; import org.junit.Test; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; public class PMetaDataImplTest { @@ -72,9 +76,11 @@ public class PMetaDataImplTest { @Test public void testEviction() throws Exception { - long maxSize = 10; TestTimeKeeper timeKeeper = new TestTimeKeeper(); - PMetaData metaData = new PMetaDataImpl(5, maxSize, timeKeeper); + Map<String, String> props = Maps.newHashMapWithExpectedSize(2); + props.put(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, "10"); + props.put(QueryServices.CLIENT_CACHE_ENCODING, "object"); + PMetaData metaData = new PMetaDataImpl(5, timeKeeper, new ReadOnlyProps(props)); addToTable(metaData, "a", 5, timeKeeper); assertEquals(1, metaData.size()); addToTable(metaData, "b", 4, timeKeeper); @@ -116,9 +122,11 @@ public class PMetaDataImplTest { @Test public void shouldNotEvictMoreEntriesThanNecessary() throws Exception { - long maxSize = 5; TestTimeKeeper timeKeeper = new TestTimeKeeper(); - PMetaData metaData = new PMetaDataImpl(5, maxSize, timeKeeper); + Map<String, String> props = Maps.newHashMapWithExpectedSize(2); + props.put(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, "5"); + props.put(QueryServices.CLIENT_CACHE_ENCODING, "object"); + PMetaData metaData = new PMetaDataImpl(5, timeKeeper, new ReadOnlyProps(props)); addToTable(metaData, "a", 1, timeKeeper); assertEquals(1, metaData.size()); addToTable(metaData, "b", 1, timeKeeper); @@ -136,9 +144,11 @@ public class PMetaDataImplTest { @Test public void shouldAlwaysKeepAtLeastOneEntryEvenIfTooLarge() throws Exception { - long maxSize = 5; TestTimeKeeper timeKeeper = new TestTimeKeeper(); - PMetaData metaData = new PMetaDataImpl(5, maxSize, timeKeeper); + Map<String, String> props = Maps.newHashMapWithExpectedSize(2); + props.put(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, "5"); + props.put(QueryServices.CLIENT_CACHE_ENCODING, "object"); + PMetaData metaData = new PMetaDataImpl(5, timeKeeper, new ReadOnlyProps(props)); addToTable(metaData, "a", 1, timeKeeper); assertEquals(1, metaData.size()); addToTable(metaData, "b", 1, timeKeeper); @@ -157,9 +167,11 @@ public class PMetaDataImplTest { @Test public void shouldAlwaysKeepOneEntryIfMaxSizeIsZero() throws Exception { - long maxSize = 0; TestTimeKeeper timeKeeper = new TestTimeKeeper(); - PMetaData metaData = new PMetaDataImpl(0, maxSize, timeKeeper); + Map<String, String> props = Maps.newHashMapWithExpectedSize(2); + props.put(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, "0"); + props.put(QueryServices.CLIENT_CACHE_ENCODING, "object"); + PMetaData metaData = new PMetaDataImpl(5, timeKeeper, new ReadOnlyProps(props)); addToTable(metaData, "a", 1, timeKeeper); assertEquals(1, metaData.size()); addToTable(metaData, "b", 1, timeKeeper); @@ -178,9 +190,11 @@ public class PMetaDataImplTest { @Test public void testAge() throws Exception { - long maxSize = 10; TestTimeKeeper timeKeeper = new TestTimeKeeper(); - PMetaData metaData = new PMetaDataImpl(5, maxSize, timeKeeper); + Map<String, String> props = Maps.newHashMapWithExpectedSize(2); + props.put(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, "10"); + props.put(QueryServices.CLIENT_CACHE_ENCODING, "object"); + PMetaData metaData = new PMetaDataImpl(5, timeKeeper, new ReadOnlyProps(props)); String tableName = "a"; addToTable(metaData, tableName, 1, timeKeeper); PTableRef aTableRef = metaData.getTableRef(new PTableKey(null,tableName));