Repository: phoenix Updated Branches: refs/heads/4.0 a18862d06 -> 5cdc938e8
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java new file mode 100644 index 0000000..17b5825 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorException; +import org.apache.hadoop.hbase.coprocessor.CoprocessorService; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.generated.StatCollectorProtos; +import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectRequest; +import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectResponse; +import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectResponse.Builder; +import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectService; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PDataType; +import org.apache.phoenix.schema.PhoenixArray; +import org.apache.phoenix.util.TimeKeeper; + +import com.google.common.collect.Lists; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + +/** + * An endpoint implementation that allows to collect the stats for a given region and groups the stat per family. This is also an + * RegionObserver that collects the information on compaction also. The user would be allowed to invoke this endpoint and thus populate the + * Phoenix stats table with the max key, min key and guide posts for the given region. The stats can be consumed by the stats associated + * with every PTable and the same can be used to parallelize the queries + */ +public class StatisticsCollector extends BaseRegionObserver implements CoprocessorService, Coprocessor, + StatisticsTracker, StatCollectService.Interface { + + public static void addToTable(HTableDescriptor desc) throws IOException { + desc.addCoprocessor(StatisticsCollector.class.getName()); + } + + private Map<String, byte[]> minMap = new ConcurrentHashMap<String, byte[]>(); + private Map<String, byte[]> maxMap = new ConcurrentHashMap<String, byte[]>(); + private long guidepostDepth; + private long byteCount = 0; + private Map<String, List<byte[]>> guidePostsMap = new ConcurrentHashMap<String, List<byte[]>>(); + private Map<ImmutableBytesPtr, Boolean> familyMap = new ConcurrentHashMap<ImmutableBytesPtr, Boolean>(); + private RegionCoprocessorEnvironment env; + protected StatisticsTable stats; + // Ensures that either analyze or compaction happens at any point of time. + private ReentrantLock lock = new ReentrantLock(); + private static final Log LOG = LogFactory.getLog(StatisticsCollector.class); + + @Override + public void collectStat(RpcController controller, StatCollectRequest request, RpcCallback<StatCollectResponse> done) { + HRegion region = env.getRegion(); + boolean heldLock = false; + int count = 0; + Builder newBuilder = StatCollectResponse.newBuilder(); + try { + if (lock.tryLock()) { + heldLock = true; + // Clear all old stats + clear(); + Scan scan = createScan(env.getConfiguration()); + if (request.hasStartRow()) { + scan.setStartRow(request.getStartRow().toByteArray()); + } + if (request.hasStopRow()) { + scan.setStopRow(request.getStopRow().toByteArray()); + } + RegionScanner scanner = null; + try { + scanner = region.getScanner(scan); + count = scanRegion(scanner, count); + } catch (IOException e) { + LOG.error(e); + ResponseConverter.setControllerException(controller, e); + } finally { + if (scanner != null) { + try { + ArrayList<Mutation> mutations = new ArrayList<Mutation>(); + writeStatsToStatsTable(region, scanner, true, mutations, TimeKeeper.SYSTEM.getCurrentTime()); + if (LOG.isDebugEnabled()) { + LOG.debug("Committing new stats for the region " + region.getRegionInfo()); + } + commitStats(mutations); + } catch (IOException e) { + LOG.error(e); + ResponseConverter.setControllerException(controller, e); + } finally { + clear(); + } + } + } + } + } finally { + if (heldLock) { + lock.unlock(); + } + newBuilder.setRowsScanned(count); + StatCollectResponse result = newBuilder.build(); + done.run(result); + } + } + + private void writeStatsToStatsTable(final HRegion region, + final RegionScanner scanner, boolean delete, List<Mutation> mutations, long currentTime) throws IOException { + scanner.close(); + try { + // update the statistics table + for (ImmutableBytesPtr fam : familyMap.keySet()) { + String tableName = region.getRegionInfo().getTable().getNameAsString(); + if (delete) { + if(LOG.isDebugEnabled()) { + LOG.debug("Deleting the stats for the region "+region.getRegionInfo()); + } + stats.deleteStats(tableName, region.getRegionInfo().getRegionNameAsString(), this, + Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime); + } + if(LOG.isDebugEnabled()) { + LOG.debug("Adding new stats for the region "+region.getRegionInfo()); + } + stats.addStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this, + Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime); + } + } catch (IOException e) { + LOG.error("Failed to update statistics table!", e); + throw e; + } + } + + private void commitStats(List<Mutation> mutations) throws IOException { + stats.commitStats(mutations); + } + + private void deleteStatsFromStatsTable(final HRegion region, List<Mutation> mutations, long currentTime) throws IOException { + try { + // update the statistics table + for (ImmutableBytesPtr fam : familyMap.keySet()) { + String tableName = region.getRegionInfo().getTable().getNameAsString(); + stats.deleteStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this, + Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime); + } + } catch (IOException e) { + LOG.error("Failed to delete from statistics table!", e); + throw e; + } + } + + private int scanRegion(RegionScanner scanner, int count) throws IOException { + List<Cell> results = new ArrayList<Cell>(); + boolean hasMore = true; + while (hasMore) { + // Am getting duplicates here. Need to avoid that + hasMore = scanner.next(results); + updateStat(results); + count += results.size(); + results.clear(); + while (!hasMore) { + break; + } + } + return count; + } + + /** + * Update the current statistics based on the lastest batch of key-values from the underlying scanner + * + * @param results + * next batch of {@link KeyValue}s + */ + protected void updateStat(final List<Cell> results) { + for (Cell c : results) { + KeyValue kv = KeyValueUtil.ensureKeyValue(c); + updateStatistic(kv); + } + } + + @Override + public Service getService() { + return StatCollectorProtos.StatCollectService.newReflectiveService(this); + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + if (env instanceof RegionCoprocessorEnvironment) { + this.env = (RegionCoprocessorEnvironment)env; + } else { + throw new CoprocessorException("Must be loaded on a table region!"); + } + HTableDescriptor desc = ((RegionCoprocessorEnvironment)env).getRegion().getTableDesc(); + // Get the stats table associated with the current table on which the CP is + // triggered + stats = StatisticsTable.getStatisticsTableForCoprocessor(env, desc.getName()); + guidepostDepth = env.getConfiguration().getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_CONF_KEY, + QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH); + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + if (env instanceof RegionCoprocessorEnvironment) { + TableName table = ((RegionCoprocessorEnvironment)env).getRegion().getRegionInfo().getTable(); + // Close only if the table is system table + if(table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) { + stats.close(); + } + } + } + + @Override + public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, + List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s) + throws IOException { + InternalScanner internalScan = s; + TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); + if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) { + boolean heldLock = false; + try { + if (lock.tryLock()) { + heldLock = true; + // See if this is for Major compaction + if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) { + // this is the first CP accessed, so we need to just create a major + // compaction scanner, just + // like in the compactor + if (s == null) { + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + long smallestReadPoint = store.getSmallestReadPoint(); + internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType, + smallestReadPoint, earliestPutTs); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Compaction scanner created for stats"); + } + InternalScanner scanner = getInternalScanner(c, store, internalScan, + store.getColumnFamilyName()); + if (scanner != null) { + internalScan = scanner; + } + } + } + } finally { + if (heldLock) { + lock.unlock(); + } + } + } + return internalScan; + } + + + @Override + public void postSplit(ObserverContext<RegionCoprocessorEnvironment> ctx, HRegion l, HRegion r) throws IOException { + // Invoke collectStat here + HRegion region = ctx.getEnvironment().getRegion(); + TableName table = region.getRegionInfo().getTable(); + if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) { + if (familyMap != null) { + familyMap.clear(); + } + // Create a delete operation on the parent region + // Then write the new guide posts for individual regions + // TODO : Try making this atomic + List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3); + long currentTime = TimeKeeper.SYSTEM.getCurrentTime(); + Configuration conf = ctx.getEnvironment().getConfiguration(); + if(LOG.isDebugEnabled()) { + LOG.debug("Collecting stats for the daughter region "+l.getRegionInfo()); + } + collectStatsForSplitRegions(conf, l, region, true, mutations, currentTime); + clear(); + if(LOG.isDebugEnabled()) { + LOG.debug("Collecting stats for the daughter region "+r.getRegionInfo()); + } + collectStatsForSplitRegions(conf, r, region, false, mutations, currentTime); + clear(); + if (LOG.isDebugEnabled()) { + LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo()); + } + commitStats(mutations); + } + } + + private void collectStatsForSplitRegions(Configuration conf, HRegion daughter, HRegion parent, boolean delete, + List<Mutation> mutations, long currentTime) throws IOException { + Scan scan = createScan(conf); + RegionScanner scanner = null; + int count = 0; + try { + scanner = daughter.getScanner(scan); + count = scanRegion(scanner, count); + } catch (IOException e) { + LOG.error(e); + throw e; + } finally { + if (scanner != null) { + try { + if (delete) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting the stats for the parent region " + parent.getRegionInfo()); + } + deleteStatsFromStatsTable(parent, mutations, currentTime); + } + writeStatsToStatsTable(daughter, scanner, false, mutations, currentTime); + } catch (IOException e) { + LOG.error(e); + throw e; + } + } + } + } + + private Scan createScan(Configuration conf) { + Scan scan = new Scan(); + scan.setCaching( + conf.getInt(QueryServices.SCAN_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_SCAN_CACHE_SIZE)); + // do not cache the blocks here + scan.setCacheBlocks(false); + return scan; + } + + protected InternalScanner getInternalScanner(ObserverContext<RegionCoprocessorEnvironment> c, Store store, + InternalScanner internalScan, String family) { + return new StatisticsScanner(this, stats, c.getEnvironment().getRegion().getRegionInfo(), internalScan, + Bytes.toBytes(family)); + } + + @Override + public void clear() { + this.maxMap.clear(); + this.minMap.clear(); + this.guidePostsMap.clear(); + this.familyMap.clear(); + } + + @Override + public void updateStatistic(KeyValue kv) { + byte[] cf = kv.getFamily(); + familyMap.put(new ImmutableBytesPtr(cf), true); + + String fam = Bytes.toString(cf); + byte[] row = new ImmutableBytesPtr(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()) + .copyBytesIfNecessary(); + if (!minMap.containsKey(fam) && !maxMap.containsKey(fam)) { + minMap.put(fam, row); + // Ideally the max key also should be added in this case + maxMap.put(fam, row); + } else { + if (Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), minMap.get(fam), 0, + minMap.get(fam).length) < 0) { + minMap.put(fam, row); + } + if (Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), maxMap.get(fam), 0, + maxMap.get(fam).length) > 0) { + maxMap.put(fam, row); + } + } + byteCount += kv.getLength(); + // TODO : This can be moved to an interface so that we could collect guide posts in different ways + if (byteCount >= guidepostDepth) { + if (guidePostsMap.get(fam) != null) { + guidePostsMap.get(fam).add( + row); + } else { + List<byte[]> guidePosts = new ArrayList<byte[]>(); + guidePosts.add(row); + guidePostsMap.put(fam, guidePosts); + } + // reset the count for the next key + byteCount = 0; + } + } + + @Override + public byte[] getMaxKey(String fam) { + if (maxMap.get(fam) != null) { return maxMap.get(fam); } + return null; + } + + @Override + public byte[] getMinKey(String fam) { + if (minMap.get(fam) != null) { return minMap.get(fam); } + return null; + } + + @Override + public byte[] getGuidePosts(String fam) { + if (!guidePostsMap.isEmpty()) { + List<byte[]> guidePosts = guidePostsMap.get(fam); + if (guidePosts != null) { + byte[][] array = new byte[guidePosts.size()][]; + int i = 0; + for (byte[] element : guidePosts) { + array[i] = element; + i++; + } + PhoenixArray phoenixArray = new PhoenixArray(PDataType.VARBINARY, array); + return PDataType.VARBINARY_ARRAY.toBytes(phoenixArray); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java new file mode 100644 index 0000000..09174b2 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the License. + */ +package org.apache.phoenix.schema.stat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TimeKeeper; + +/** + * The scanner that does the scanning to collect the stats during major compaction.{@link StatisticsCollector} + */ +public class StatisticsScanner implements InternalScanner { + private static final Log LOG = LogFactory.getLog(StatisticsScanner.class); + private InternalScanner delegate; + private StatisticsTable stats; + private HRegionInfo region; + private StatisticsTracker tracker; + private byte[] family; + + public StatisticsScanner(StatisticsTracker tracker, StatisticsTable stats, HRegionInfo region, + InternalScanner delegate, byte[] family) { + // should there be only one tracker? + this.tracker = tracker; + this.stats = stats; + this.delegate = delegate; + this.region = region; + this.family = family; + this.tracker.clear(); + } + + @Override + public boolean next(List<Cell> result) throws IOException { + boolean ret = delegate.next(result); + updateStat(result); + return ret; + } + + @Override + public boolean next(List<Cell> result, int limit) throws IOException { + boolean ret = delegate.next(result, limit); + updateStat(result); + return ret; + } + + /** + * Update the current statistics based on the lastest batch of key-values from the underlying scanner + * + * @param results + * next batch of {@link KeyValue}s + */ + protected void updateStat(final List<Cell> results) { + for (Cell c : results) { + KeyValue kv = KeyValueUtil.ensureKeyValue(c); + if (c.getTypeByte() == KeyValue.Type.Put.getCode()) { + tracker.updateStatistic(kv); + } + } + } + + public void close() throws IOException { + IOException toThrow = null; + try { + // update the statistics table + // Just verify if this if fine + String tableName = SchemaUtil.getTableNameFromFullName(region.getTable().getNameAsString()); + ArrayList<Mutation> mutations = new ArrayList<Mutation>(); + long currentTime = TimeKeeper.SYSTEM.getCurrentTime(); + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString() + + " as part of major compaction"); + } + stats.deleteStats(tableName, region.getRegionNameAsString(), this.tracker, Bytes.toString(family), + mutations, currentTime); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding new stats for the region " + region.getRegionNameAsString() + + " as part of major compaction"); + } + stats.addStats(tableName, region.getRegionNameAsString(), this.tracker, Bytes.toString(family), mutations, + currentTime); + if (LOG.isDebugEnabled()) { + LOG.debug("Committing new stats for the region " + region.getRegionNameAsString() + + " as part of major compaction"); + } + stats.commitStats(mutations); + } catch (IOException e) { + LOG.error("Failed to update statistics table!", e); + toThrow = e; + } + // close the delegate scanner + try { + delegate.close(); + } catch (IOException e) { + LOG.error("Error while closing the scanner"); + // TODO : We should throw the exception + /*if (toThrow == null) { throw e; } + throw MultipleIOException.createIOException(Lists.newArrayList(toThrow, e));*/ + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java new file mode 100644 index 0000000..fcbbee9 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stat; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.HTablePool; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PDataType; + +/** + * Wrapper to access the statistics table SYSTEM.STATS using the HTable. + */ +@SuppressWarnings("deprecation") +public class StatisticsTable implements Closeable { + /** Map of the currently open statistics tables */ + private static final Map<String, StatisticsTable> tableMap = new HashMap<String, StatisticsTable>(); + /** + * @param env + * Environment wherein the coprocessor is attempting to update the stats table. + * @param primaryTableName + * name of the primary table on which we should collect stats + * @return the {@link StatisticsTable} for the given primary table. + * @throws IOException + * if the table cannot be created due to an underlying HTable creation error + */ + public synchronized static StatisticsTable getStatisticsTableForCoprocessor(CoprocessorEnvironment env, + byte[] primaryTableName) throws IOException { + StatisticsTable table = tableMap.get(primaryTableName); + if (table == null) { + // Map the statics table and the table with which the statistics is + // associated. This is a workaround + HTablePool pool = new HTablePool(env.getConfiguration(), 1); + HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME); + table = new StatisticsTable(hTable, primaryTableName); + tableMap.put(Bytes.toString(primaryTableName), table); + } + return table; + } + + private final HTableInterface statisticsTable; + private final byte[] sourceTableName; + + private StatisticsTable(HTableInterface statsTable, byte[] sourceTableName) { + this.statisticsTable = statsTable; + this.sourceTableName = sourceTableName; + } + + public StatisticsTable(Configuration conf, HTableDescriptor source) throws IOException { + this(new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME), source.getName()); + } + + /** + * Close the connection to the table + */ + @Override + public void close() throws IOException { + statisticsTable.close(); + } + + /** + * Update a list of statistics for a given region. If the ANALYZE <tablename> query is issued + * then we use Upsert queries to update the table + * If the region gets splitted or the major compaction happens we update using HTable.put() + * @param tablekey - The table name + * @param schemaName - the schema name associated with the table + * @param region name - the region of the table for which the stats are collected + * @param tracker - the statistics tracker + * @param fam - the family for which the stats is getting collected. + * @param split - if the updation is caused due to a split + * @param mutations - list of mutations that collects all the mutations to commit in a batch + * @param currentTime - the current time + * @throws IOException + * if we fail to do any of the puts. Any single failure will prevent any future attempts for the remaining list of stats to + * update + */ + public void addStats(String tableName, String regionName, StatisticsTracker tracker, String fam, + List<Mutation> mutations, long currentTime) throws IOException { + if (tracker == null) { return; } + + // Add the timestamp header + formLastUpdatedStatsMutation(tableName, currentTime, mutations); + + byte[] prefix = StatisticsUtils.getRowKey(PDataType.VARCHAR.toBytes(tableName), PDataType.VARCHAR.toBytes(fam), + PDataType.VARCHAR.toBytes(regionName)); + formStatsUpdateMutation(tracker, fam, mutations, currentTime, prefix); + } + + public void commitStats(List<Mutation> mutations) throws IOException { + Object[] res = new Object[mutations.size()]; + try { + statisticsTable.batch(mutations, res); + } catch (InterruptedException e) { + throw new IOException("Exception while adding deletes and puts"); + } + } + + private void formStatsUpdateMutation(StatisticsTracker tracker, String fam, List<Mutation> mutations, + long currentTime, byte[] prefix) { + Put put = new Put(prefix, currentTime); + if (tracker.getGuidePosts(fam) != null) { + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES, + currentTime, (tracker.getGuidePosts(fam))); + } + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_KEY_BYTES, + currentTime, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam))); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES, + currentTime, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam))); + mutations.add(put); + } + + private void formLastUpdatedStatsMutation(String tableName, long currentTime, List<Mutation> mutations) throws IOException { + byte[] prefix = StatisticsUtils.getRowKeyForTSUpdate(PDataType.VARCHAR.toBytes(tableName)); + Put put = new Put(prefix); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, currentTime, + PDataType.DATE.toBytes(new Date(currentTime))); + mutations.add(put); + } + + public void deleteStats(String tableName, String regionName, StatisticsTracker tracker, String fam, + List<Mutation> mutations, long currentTime) + throws IOException { + byte[] prefix = StatisticsUtils.getRowKey(PDataType.VARCHAR.toBytes(tableName), PDataType.VARCHAR.toBytes(fam), + PDataType.VARCHAR.toBytes(regionName)); + mutations.add(new Delete(prefix, currentTime - 1)); + } + + /** + * @return the underlying {@link HTableInterface} to which this table is writing + */ + HTableInterface getUnderlyingTable() { + return statisticsTable; + } + + byte[] getSourceTableName() { + return this.sourceTableName; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java new file mode 100644 index 0000000..e1754f3 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stat; + +import org.apache.hadoop.hbase.KeyValue; + +/** + * Track a statistic for the column on a given region + */ +public interface StatisticsTracker { + + /** + * Reset the statistic after the completion of the compaction + */ + public void clear(); + + /** + * Update the current statistics with the next {@link KeyValue} to be written + * + * @param kv + * next {@link KeyValue} to be written. + */ + public void updateStatistic(KeyValue kv); + + /** + * Return the max key of the family + * @param fam + * @return + */ + public byte[] getMaxKey(String fam); + + /** + * Return the min key of the family + * + * @param fam + * @return + */ + public byte[] getMinKey(String fam); + + /** + * Return the guide posts of the family + * + * @param fam + * @return + */ + public byte[] getGuidePosts(String fam); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java new file mode 100644 index 0000000..7cb3a38 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stat; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.util.TrustedByteArrayOutputStream; +/** + * Simple utility class for managing multiple key parts of the statistic + */ +public class StatisticsUtils { + + private StatisticsUtils() { + // private ctor for utility classes + } + + /** Number of parts in our complex key */ + protected static final int NUM_KEY_PARTS = 3; + + public static byte[] getRowKey(byte[] table, byte[] fam, byte[] region) throws IOException { + // always starts with the source table + TrustedByteArrayOutputStream os = new TrustedByteArrayOutputStream(table.length + region.length + + fam.length + (NUM_KEY_PARTS - 1)); + os.write(table); + os.write(QueryConstants.SEPARATOR_BYTE_ARRAY); + os.write(fam); + os.write(QueryConstants.SEPARATOR_BYTE_ARRAY); + os.write(region); + os.close(); + return os.getBuffer(); + } + + public static byte[] getRowKeyForTSUpdate(byte[] table) throws IOException { + // always starts with the source table + TrustedByteArrayOutputStream os = new TrustedByteArrayOutputStream(table.length); + os.write(table); + os.close(); + return os.getBuffer(); + } + + public static byte[] getCFFromRowKey(byte[] table, byte[] row, int rowOffset, int rowLength) { + // Move over the the sepeartor byte that would be written after the table name + int startOff = Bytes.indexOf(row, table) + (table.length) + 1; + int endOff = startOff; + while (endOff < rowLength) { + // Check for next seperator byte + if (row[endOff] != QueryConstants.SEPARATOR_BYTE) { + endOff++; + } else { + break; + } + } + int cfLength = endOff - startOff; + byte[] cf = new byte[cfLength]; + System.arraycopy(row, startOff, cf, 0, cfLength); + return cf; + } + + public static byte[] copyRow(KeyValue kv) { + return Arrays.copyOfRange(kv.getRowArray(), kv.getRowOffset(), kv.getRowOffset() + kv.getRowLength()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index e17e9bf..37285f6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -19,7 +19,6 @@ package org.apache.phoenix.query; import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; import static org.apache.phoenix.util.TestUtil.ATABLE_NAME; @@ -51,7 +50,6 @@ import static org.apache.phoenix.util.TestUtil.JOIN_ITEM_TABLE_FULL_NAME; import static org.apache.phoenix.util.TestUtil.JOIN_ORDER_TABLE_FULL_NAME; import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME; import static org.apache.phoenix.util.TestUtil.KEYONLY_NAME; -import static org.apache.phoenix.util.TestUtil.LOCALHOST; import static org.apache.phoenix.util.TestUtil.MDTEST_NAME; import static org.apache.phoenix.util.TestUtil.MILLIS_IN_DAY; import static org.apache.phoenix.util.TestUtil.MULTI_CF_NAME; @@ -125,7 +123,12 @@ import org.apache.phoenix.schema.NewerTableAlreadyExistsException; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; -import org.apache.phoenix.util.*; +import org.apache.phoenix.util.ConfigUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; import org.junit.Assert; import com.google.common.collect.ImmutableMap; @@ -548,6 +551,7 @@ public abstract class BaseTest { conf.setInt("hbase.hlog.asyncer.number", 2); conf.setInt("hbase.assignment.zkevent.workers", 5); conf.setInt("hbase.assignment.threads.max", 5); + conf.setInt(QueryServices.HISTOGRAM_BYTE_DEPTH_CONF_KEY, 20); return conf; } @@ -1225,13 +1229,12 @@ public abstract class BaseTest { try { HTableDescriptor[] tables = admin.listTables(); for (HTableDescriptor table : tables) { - boolean isCatalogTable = (Bytes.compareTo(table.getName(), PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES) == 0); - boolean isSequenceTable = (Bytes.compareTo(table.getName(), PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES) == 0); - if (!isCatalogTable && !isSequenceTable) { + String schemaName = SchemaUtil.getSchemaNameFromFullName(table.getName()); + if (!QueryConstants.SYSTEM_SCHEMA_NAME.equals(schemaName)) { admin.disableTable(table.getName()); admin.deleteTable(table.getName()); } - } + } } finally { admin.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index 47f5b1b..2e3c8f7 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -32,7 +32,7 @@ import org.apache.phoenix.util.ReadOnlyProps; */ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { - private static final int DEFAULT_THREAD_POOL_SIZE = 8; + private static final int DEFAULT_THREAD_POOL_SIZE = 20; private static final int DEFAULT_QUEUE_SIZE = 0; // TODO: setting this down to 5mb causes insufficient memory exceptions. Need to investigate why private static final int DEFAULT_MAX_MEMORY_PERC = 30; // 30% of heap @@ -42,8 +42,6 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { private static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100; private static final int DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS = 60000 * 60; // 1HR (to prevent age-out of hash cache during debugging) private static final long DEFAULT_MAX_HASH_CACHE_SIZE = 1024*1024*10; // 10 Mb - private static final int DEFAULT_TARGET_QUERY_CONCURRENCY = 4; - private static final int DEFAULT_MAX_QUERY_CONCURRENCY = 8; private static final boolean DEFAULT_DROP_METADATA = false; private static final int DEFAULT_MASTER_INFO_PORT = -1; @@ -69,8 +67,6 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { .setMaxMemoryWaitMs(DEFAULT_MAX_MEMORY_WAIT_MS) .setMaxTenantMemoryPerc(DEFAULT_MAX_TENANT_MEMORY_PERC) .setMaxServerCacheSize(DEFAULT_MAX_HASH_CACHE_SIZE) - .setTargetQueryConcurrency(DEFAULT_TARGET_QUERY_CONCURRENCY) - .setMaxQueryConcurrency(DEFAULT_MAX_QUERY_CONCURRENCY) .setRowKeyOrderSaltedTable(true) .setMaxServerCacheTTLMs(DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS) .setMasterInfoPort(DEFAULT_MASTER_INFO_PORT) http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-protocol/src/main/MetaDataService.proto ---------------------------------------------------------------------- diff --git a/phoenix-protocol/src/main/MetaDataService.proto b/phoenix-protocol/src/main/MetaDataService.proto index a766674..9174b4d 100644 --- a/phoenix-protocol/src/main/MetaDataService.proto +++ b/phoenix-protocol/src/main/MetaDataService.proto @@ -92,6 +92,16 @@ message GetVersionResponse { required int64 version = 1; } +message ClearCacheForTableRequest { + required bytes tenantId = 1; + required bytes schemaName = 2; + required bytes tableName = 3; + required int64 clientTimestamp = 4; +} + +message ClearCacheForTableResponse { +} + service MetaDataService { rpc getTable(GetTableRequest) returns (MetaDataResponse); @@ -115,5 +125,8 @@ service MetaDataService { returns (ClearCacheResponse); rpc getVersion(GetVersionRequest) - returns (GetVersionResponse); + returns (GetVersionResponse); + + rpc clearCacheForTable(ClearCacheForTableRequest) + returns (ClearCacheForTableResponse); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-protocol/src/main/PTable.proto ---------------------------------------------------------------------- diff --git a/phoenix-protocol/src/main/PTable.proto b/phoenix-protocol/src/main/PTable.proto index 20c63e1..3b5f5cf 100644 --- a/phoenix-protocol/src/main/PTable.proto +++ b/phoenix-protocol/src/main/PTable.proto @@ -72,6 +72,6 @@ message PTable { optional bytes viewStatement = 18; repeated bytes physicalNames = 19; optional bytes tenantId = 20; - optional int32 viewIndexId = 21; + optional int32 viewIndexId = 21; optional bytes indexType = 22; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-protocol/src/main/StatisticsCollect.proto ---------------------------------------------------------------------- diff --git a/phoenix-protocol/src/main/StatisticsCollect.proto b/phoenix-protocol/src/main/StatisticsCollect.proto new file mode 100644 index 0000000..c80a756 --- /dev/null +++ b/phoenix-protocol/src/main/StatisticsCollect.proto @@ -0,0 +1,20 @@ +option java_package = "org.apache.phoenix.coprocessor.generated"; +option java_outer_classname = "StatCollectorProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + + +message StatCollectRequest { + optional bytes startRow = 1; + optional bytes stopRow = 2; +} + +message StatCollectResponse { + required uint64 rowsScanned = 1; +} + +service StatCollectService { + rpc collectStat(StatCollectRequest) + returns (StatCollectResponse); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 4ffb4ba..f97473f 100644 --- a/pom.xml +++ b/pom.xml @@ -115,7 +115,7 @@ <!-- Plugin options --> <numForkedUT>3</numForkedUT> - <numForkedIT>7</numForkedIT> + <numForkedIT>5</numForkedIT> <!-- Set default encoding so multi-byte tests work correctly on the Mac --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
