Repository: phoenix Updated Branches: refs/heads/3.0 d8766cf77 -> 15a54d557
http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java index 24da14d..01c236f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java @@ -18,6 +18,7 @@ package org.apache.phoenix.schema; import java.util.Collection; +import java.util.List; /** * @@ -51,4 +52,6 @@ public interface PColumnFamily { PColumn getColumn(String name) throws ColumnNotFoundException; int getEstimatedSize(); + + List<byte[]> getGuidePosts(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java index 5ccd50b..15ac8fa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.schema; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -34,6 +35,7 @@ public class PColumnFamilyImpl implements PColumnFamily { private final Map<String, PColumn> columnByString; private final Map<byte[], PColumn> columnByBytes; private final int estimatedSize; + private List<byte[]> guidePosts = Collections.emptyList(); @Override public int getEstimatedSize() { @@ -41,9 +43,23 @@ public class PColumnFamilyImpl implements PColumnFamily { } public PColumnFamilyImpl(PName name, List<PColumn> columns) { + this(name, columns, null); + } + + public PColumnFamilyImpl(PName name, List<PColumn> columns, List<byte[]> guidePosts) { Preconditions.checkNotNull(name); - long estimatedSize = SizedUtil.OBJECT_SIZE + SizedUtil.POINTER_SIZE * 4 + SizedUtil.INT_SIZE + name.getEstimatedSize() + - SizedUtil.sizeOfMap(columns.size()) * 2 + SizedUtil.sizeOfArrayList(columns.size()); + // Include guidePosts also in estimating the size + int guidePostsSize = 0; + if(guidePosts != null) { + guidePostsSize = guidePosts.size(); + for(byte[] gps : guidePosts) { + guidePostsSize += gps.length; + } + Collections.sort(guidePosts, Bytes.BYTES_COMPARATOR); + this.guidePosts = guidePosts; + } + long estimatedSize = SizedUtil.OBJECT_SIZE + SizedUtil.POINTER_SIZE * 5 + SizedUtil.INT_SIZE + name.getEstimatedSize() + + SizedUtil.sizeOfMap(columns.size()) * 2 + SizedUtil.sizeOfArrayList(columns.size()) + SizedUtil.sizeOfArrayList(guidePostsSize); this.name = name; this.columns = ImmutableList.copyOf(columns); ImmutableMap.Builder<String, PColumn> columnByStringBuilder = ImmutableMap.builder(); @@ -85,4 +101,9 @@ public class PColumnFamilyImpl implements PColumnFamily { } return column; } + + @Override + public List<byte[]> getGuidePosts() { + return guidePosts; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java index 0af04da..5bd2cf2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; -import org.apache.phoenix.schema.stat.PTableStats; /** @@ -218,10 +217,11 @@ public interface PTable extends Writable { int newKey(ImmutableBytesWritable key, byte[][] values); /** - * Return the statistics table associated with this PTable. + * Return the statistics table associated with this PTable. A list of + * guide posts are return * @return the statistics table. */ - PTableStats getTableStats(); + List<byte[]> getGuidePosts(); RowKeySchema getRowKeySchema(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index 74b6c41..6feeaa1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -28,10 +28,10 @@ import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; @@ -95,8 +95,6 @@ public class PTableImpl implements PTable { private ListMultimap<String,PColumn> columnsByName; private PName pkName; private Integer bucketNum; - // Statistics associated with this table. - private PTableStats stats; private RowKeySchema rowKeySchema; // Indexes associated with this table. private List<PTable> indexes; @@ -114,6 +112,7 @@ public class PTableImpl implements PTable { private ViewType viewType; private Short viewIndexId; private int estimatedSize; + private List<byte[]> guidePosts = Collections.emptyList(); public PTableImpl() { this.indexes = Collections.emptyList(); @@ -211,6 +210,17 @@ public class PTableImpl implements PTable { return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType, viewIndexId); } + + public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type, + PIndexState state, long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum, + List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows, + List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, + boolean multiTenant, ViewType viewType, Short viewIndexId, PTableStats stats) + throws SQLException { + return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, + bucketNum, columns, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, + viewExpression, disableWAL, multiTenant, viewType, viewIndexId, stats); + } private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum, List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows, @@ -218,6 +228,16 @@ public class PTableImpl implements PTable { init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, new PTableStatsImpl(), dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType, viewIndexId); } + + private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, + long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum, List<PColumn> columns, + PName dataTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, + PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType, + Short viewIndexId, PTableStats stats) throws SQLException { + init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, + stats, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, + disableWAL, multiTenant, viewType, viewIndexId); + } @Override public boolean isMultiTenant() { @@ -327,16 +347,40 @@ public class PTableImpl implements PTable { columnsInFamily.add(column); } } - this.rowKeySchema = builder.build(); estimatedSize += rowKeySchema.getEstimatedSize(); Iterator<Map.Entry<PName,List<PColumn>>> iterator = familyMap.entrySet().iterator(); PColumnFamily[] families = new PColumnFamily[familyMap.size()]; + if (families.length == 0) { + if(stats != null) { + byte[] defaultFamilyNameBytes = null; + if(defaultFamilyName == null) { + defaultFamilyNameBytes = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; + } else { + defaultFamilyNameBytes = defaultFamilyName.getBytes(); + } + if (stats.getGuidePosts().get(defaultFamilyNameBytes) != null) { + guidePosts = stats.getGuidePosts().get(defaultFamilyNameBytes); + if (guidePosts != null) { + Collections.sort(guidePosts, Bytes.BYTES_COMPARATOR); + estimatedSize += SizedUtil.sizeOfArrayList(guidePosts.size()); + for (byte[] gps : guidePosts) { + estimatedSize += gps.length; + } + } + } + } + } ImmutableMap.Builder<String, PColumnFamily> familyByString = ImmutableMap.builder(); - ImmutableSortedMap.Builder<byte[], PColumnFamily> familyByBytes = ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR); + ImmutableSortedMap.Builder<byte[], PColumnFamily> familyByBytes = ImmutableSortedMap + .orderedBy(Bytes.BYTES_COMPARATOR); + List<byte[]> famGuidePosts = null; for (int i = 0; i < families.length; i++) { Map.Entry<PName,List<PColumn>> entry = iterator.next(); - PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), entry.getValue()); + if (stats != null) { + famGuidePosts = stats.getGuidePosts().get(entry.getKey().getBytes()); + } + PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), entry.getValue(), famGuidePosts); families[i] = family; familyByString.put(family.getName().getString(), family); familyByBytes.put(family.getName().getBytes(), family); @@ -347,8 +391,6 @@ public class PTableImpl implements PTable { this.familyByString = familyByString.build(); estimatedSize += SizedUtil.sizeOfArrayList(families.length); estimatedSize += SizedUtil.sizeOfMap(families.length) * 2; - - this.stats = stats; this.indexes = indexes == null ? Collections.<PTable>emptyList() : indexes; for (PTable index : this.indexes) { estimatedSize += index.getEstimatedSize(); @@ -687,8 +729,8 @@ public class PTableImpl implements PTable { } @Override - public PTableStats getTableStats() { - return stats; + public List<byte[]> getGuidePosts() { + return guidePosts; } @Override @@ -732,14 +774,14 @@ public class PTableImpl implements PTable { indexes.add(index); } boolean isImmutableRows = input.readBoolean(); - Map<String, byte[][]> guidePosts = new HashMap<String, byte[][]>(); + TreeMap<byte[], List<byte[]>> guidePosts = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR); int size = WritableUtils.readVInt(input); - for (int i=0; i<size; i++) { - String key = WritableUtils.readString(input); + for (int i = 0; i < size; i++) { + byte[] key = Bytes.readByteArray(input); int valueSize = WritableUtils.readVInt(input); - byte[][] value = new byte[valueSize][]; - for (int j=0; j<valueSize; j++) { - value[j] = Bytes.readByteArray(input); + List<byte[]> value = Lists.newArrayListWithExpectedSize(valueSize); + for (int j = 0; j < valueSize; j++) { + value.add(j, Bytes.readByteArray(input)); } guidePosts.put(key, value); } @@ -809,6 +851,17 @@ public class PTableImpl implements PTable { index.write(output); } output.writeBoolean(isImmutableRows); + TreeMap<byte[], List<byte[]>> guidePosts = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR); + if(this.families.size() == 0) { + byte[] fam = (defaultFamilyName == null ? QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : defaultFamilyName.getBytes()); + guidePosts.put(fam, this.guidePosts); + } else { + // Get the stats from the PColumnFamily + for(PColumnFamily fam : families) { + guidePosts.put(fam.getName().getBytes(), fam.getGuidePosts()); + } + } + PTableStats stats = new PTableStatsImpl(guidePosts); stats.write(output); Bytes.writeByteArray(output, parentTableName == null ? ByteUtil.EMPTY_BYTE_ARRAY : parentTableName.getBytes()); Bytes.writeByteArray(output, defaultFamilyName == null ? ByteUtil.EMPTY_BYTE_ARRAY : defaultFamilyName.getBytes()); @@ -940,4 +993,5 @@ public class PTableImpl implements PTable { public PTableKey getKey() { return key; } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java index 2e38f26..be6cfd2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java @@ -19,8 +19,8 @@ package org.apache.phoenix.schema.stat; import java.io.DataOutput; import java.io.IOException; - -import org.apache.hadoop.hbase.HRegionInfo; +import java.util.List; +import java.util.TreeMap; /** @@ -39,7 +39,7 @@ public interface PTableStats { * @param region * @return array of keys */ - byte[][] getRegionGuidePosts(HRegionInfo region); + TreeMap<byte[], List<byte[]>> getGuidePosts(); void write(DataOutput output) throws IOException; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java index a6f6dae..88ce1fb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java @@ -19,15 +19,13 @@ package org.apache.phoenix.schema.stat; import java.io.DataOutput; import java.io.IOException; -import java.util.Map; +import java.util.List; import java.util.Map.Entry; +import java.util.TreeMap; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; -import com.google.common.collect.ImmutableMap; - /** * Implementation for PTableStats. @@ -36,32 +34,36 @@ public class PTableStatsImpl implements PTableStats { // The map for guide posts should be immutable. We only take the current snapshot from outside // method call and store it. - private Map<String, byte[][]> regionGuidePosts; - - public PTableStatsImpl() { } + + public static final PTableStats NO_STATS = new PTableStatsImpl(); + private TreeMap<byte[], List<byte[]>> guidePosts = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR); - public PTableStatsImpl(Map<String, byte[][]> stats) { - regionGuidePosts = ImmutableMap.copyOf(stats); + public PTableStatsImpl() { + this(new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR)); } + public PTableStatsImpl(TreeMap<byte[], List<byte[]>> guidePosts) { + this.guidePosts = guidePosts; + } + @Override - public byte[][] getRegionGuidePosts(HRegionInfo region) { - return regionGuidePosts.get(region.getRegionNameAsString()); + public TreeMap<byte[], List<byte[]>> getGuidePosts() { + return guidePosts; } @Override public void write(DataOutput output) throws IOException { - if (regionGuidePosts == null) { + if (guidePosts == null) { WritableUtils.writeVInt(output, 0); return; } - WritableUtils.writeVInt(output, regionGuidePosts.size()); - for (Entry<String, byte[][]> entry : regionGuidePosts.entrySet()) { - WritableUtils.writeString(output, entry.getKey()); - byte[][] value = entry.getValue(); - WritableUtils.writeVInt(output, value.length); - for (int i=0; i<value.length; i++) { - Bytes.writeByteArray(output, value[i]); + WritableUtils.writeVInt(output, guidePosts.size()); + for (Entry<byte[], List<byte[]>> entry : guidePosts.entrySet()) { + Bytes.writeByteArray(output, entry.getKey()); + List<byte[]> value = entry.getValue(); + WritableUtils.writeVInt(output, value.size()); + for (int i = 0; i < value.size(); i++) { + Bytes.writeByteArray(output, value.get(i)); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java.orig ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java.orig b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java.orig new file mode 100644 index 0000000..a6f6dae --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java.orig @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stat; + +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; + +import com.google.common.collect.ImmutableMap; + + +/** + * Implementation for PTableStats. + */ +public class PTableStatsImpl implements PTableStats { + + // The map for guide posts should be immutable. We only take the current snapshot from outside + // method call and store it. + private Map<String, byte[][]> regionGuidePosts; + + public PTableStatsImpl() { } + + public PTableStatsImpl(Map<String, byte[][]> stats) { + regionGuidePosts = ImmutableMap.copyOf(stats); + } + + @Override + public byte[][] getRegionGuidePosts(HRegionInfo region) { + return regionGuidePosts.get(region.getRegionNameAsString()); + } + + @Override + public void write(DataOutput output) throws IOException { + if (regionGuidePosts == null) { + WritableUtils.writeVInt(output, 0); + return; + } + WritableUtils.writeVInt(output, regionGuidePosts.size()); + for (Entry<String, byte[][]> entry : regionGuidePosts.entrySet()) { + WritableUtils.writeString(output, entry.getKey()); + byte[][] value = entry.getValue(); + WritableUtils.writeVInt(output, value.length); + for (int i=0; i<value.length; i++) { + Bytes.writeByteArray(output, value[i]); + } + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java.rej ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java.rej b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java.rej new file mode 100644 index 0000000..2bfc847 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java.rej @@ -0,0 +1,74 @@ +*************** +*** 16,55 **** + * limitations under the License. + */ + package org.apache.phoenix.schema.stat; + +- import java.util.Map; +- +- import org.apache.hadoop.hbase.HRegionInfo; +- +- import com.google.common.collect.ImmutableMap; +- +- +- /** + * Implementation for PTableStats. + */ + public class PTableStatsImpl implements PTableStats { + +- // The map for guide posts should be immutable. We only take the current snapshot from outside +- // method call and store it. +- private Map<String, byte[][]> regionGuidePosts; + +- public PTableStatsImpl() { } + +- public PTableStatsImpl(Map<String, byte[][]> stats) { +- regionGuidePosts = ImmutableMap.copyOf(stats); + } + +- @Override +- public byte[][] getRegionGuidePosts(HRegionInfo region) { +- return regionGuidePosts.get(region.getRegionNameAsString()); + } + + @Override +- public Map<String, byte[][]> getGuidePosts(){ +- if(regionGuidePosts != null) { +- return ImmutableMap.copyOf(regionGuidePosts); +- } +- +- return null; + } + } +--- 16,46 ---- + * limitations under the License. + */ + package org.apache.phoenix.schema.stat; ++ import java.util.List; ++ import java.util.TreeMap; + ++ import org.apache.hadoop.hbase.util.Bytes; ++ ++ /** + * Implementation for PTableStats. + */ + public class PTableStatsImpl implements PTableStats { + ++ public static final PTableStats NO_STATS = new PTableStatsImpl(); + ++ private TreeMap<byte[], List<byte[]>> guidePosts = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR); + ++ public PTableStatsImpl() { ++ this(new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR)); + } + ++ public PTableStatsImpl(TreeMap<byte[], List<byte[]>> guidePosts) { ++ this.guidePosts = guidePosts; + } + + @Override ++ public TreeMap<byte[], List<byte[]>> getGuidePosts() { ++ return guidePosts; + } ++ + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java new file mode 100644 index 0000000..b63730c --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the License. + */ +package org.apache.phoenix.schema.stat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorException; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.ProtocolSignature; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PDataType; +import org.apache.phoenix.schema.PhoenixArray; +import org.apache.phoenix.util.TimeKeeper; + +import com.google.common.collect.Lists; + +/** + * An endpoint implementation that allows to collect the stats for a given region and groups the stat per family. This is also an + * RegionObserver that collects the information on compaction also. The user would be allowed to invoke this endpoint and thus populate the + * Phoenix stats table with the max key, min key and guide posts for the given region. The stats can be consumed by the stats associated + * with every PTable and the same can be used to parallelize the queries + */ +public class StatisticsCollector extends BaseRegionObserver implements Coprocessor, StatisticsTracker, + StatisticsCollectorProtocol { + + public static void addToTable(HTableDescriptor desc) throws IOException { + desc.addCoprocessor(StatisticsCollector.class.getName()); + } + + private Map<String, byte[]> minMap = new ConcurrentHashMap<String, byte[]>(); + private Map<String, byte[]> maxMap = new ConcurrentHashMap<String, byte[]>(); + private long guidepostDepth; + private long byteCount = 0; + private Map<String, List<byte[]>> guidePostsMap = new ConcurrentHashMap<String, List<byte[]>>(); + private Map<ImmutableBytesPtr, Boolean> familyMap = new ConcurrentHashMap<ImmutableBytesPtr, Boolean>(); + private RegionCoprocessorEnvironment env; + protected StatisticsTable stats; + // Ensures that either analyze or compaction happens at any point of time. + private ReentrantLock lock = new ReentrantLock(); + private static final Log LOG = LogFactory.getLog(StatisticsCollector.class); + + @Override + public StatisticsCollectorResponse collectStat(KeyRange keyRange) throws IOException { + HRegion region = env.getRegion(); + boolean heldLock = false; + int count = 0; + try { + if (lock.tryLock()) { + heldLock = true; + // Clear all old stats + clear(); + Scan scan = createScan(env.getConfiguration()); + scan.setStartRow(keyRange.getLowerRange()); + scan.setStopRow(keyRange.getUpperRange()); + RegionScanner scanner = null; + try { + scanner = region.getScanner(scan); + count = scanRegion(scanner, count); + } catch (IOException e) { + LOG.error(e); + throw e; + } finally { + if (scanner != null) { + try { + ArrayList<Mutation> mutations = new ArrayList<Mutation>(); + writeStatsToStatsTable(region, scanner, true, mutations, TimeKeeper.SYSTEM.getCurrentTime()); + if (LOG.isDebugEnabled()) { + LOG.debug("Committing new stats for the region " + region.getRegionInfo()); + } + commitStats(mutations); + } catch (IOException e) { + LOG.error(e); + throw e; + } + } + } + } + } finally { + if (heldLock) { + lock.unlock(); + } + } + StatisticsCollectorResponse response = new StatisticsCollectorResponse(); + response.setRowsScanned(count); + return response; + } + + private void writeStatsToStatsTable(final HRegion region, final RegionScanner scanner, boolean delete, + List<Mutation> mutations, long currentTime) throws IOException { + scanner.close(); + try { + // update the statistics table + for (ImmutableBytesPtr fam : familyMap.keySet()) { + String tableName = region.getRegionInfo().getTableNameAsString(); + if (delete) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting the stats for the region " + region.getRegionInfo()); + } + stats.deleteStats(tableName, region.getRegionInfo().getRegionNameAsString(), this, + Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Adding new stats for the region " + region.getRegionInfo()); + } + stats.addStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this, + Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime); + } + } catch (IOException e) { + LOG.error("Failed to update statistics table!", e); + throw e; + } + } + + private void commitStats(List<Mutation> mutations) throws IOException { + stats.commitStats(mutations); + } + + private void deleteStatsFromStatsTable(final HRegion region, List<Mutation> mutations, long currentTime) + throws IOException { + try { + // update the statistics table + for (ImmutableBytesPtr fam : familyMap.keySet()) { + String tableName = region.getRegionInfo().getTableNameAsString(); + stats.deleteStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this, + Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime); + } + } catch (IOException e) { + LOG.error("Failed to delete from statistics table!", e); + throw e; + } + } + + private int scanRegion(RegionScanner scanner, int count) throws IOException { + List<KeyValue> results = new ArrayList<KeyValue>(); + boolean hasMore = true; + while (hasMore) { + // Am getting duplicates here. Need to avoid that + hasMore = scanner.next(results); + updateStat(results); + count += results.size(); + results.clear(); + while (!hasMore) { + break; + } + } + return count; + } + + /** + * Update the current statistics based on the lastest batch of key-values from the underlying scanner + * + * @param results + * next batch of {@link KeyValue}s + */ + protected void updateStat(final List<KeyValue> results) { + for (KeyValue kv : results) { + updateStatistic(kv); + } + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + if (env instanceof RegionCoprocessorEnvironment) { + this.env = (RegionCoprocessorEnvironment)env; + } else { + throw new CoprocessorException("Must be loaded on a table region!"); + } + String tableName = ((RegionCoprocessorEnvironment)env).getRegion().getRegionInfo().getTableNameAsString(); + if (!tableName.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) { + HTableDescriptor desc = ((RegionCoprocessorEnvironment)env).getRegion().getTableDesc(); + // Get the stats table associated with the current table on which the CP is + // triggered + stats = StatisticsTable.getStatisticsTableForCoprocessor(env, desc.getName()); + guidepostDepth = env.getConfiguration().getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, + QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH); + } + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + if (env instanceof RegionCoprocessorEnvironment) { + String tableName = ((RegionCoprocessorEnvironment)env).getRegion().getRegionInfo().getTableNameAsString(); + // Close only if the table is system table + if (tableName.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) { + stats.close(); + } + } + } + + @Override + public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, + List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s) + throws IOException { + InternalScanner internalScan = s; + String tableNameAsString = c.getEnvironment().getRegion().getRegionInfo().getTableNameAsString(); + if (!tableNameAsString.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) { + boolean heldLock = false; + try { + if (lock.tryLock()) { + heldLock = true; + // See if this is for Major compaction + if (scanType.equals(ScanType.MAJOR_COMPACT)) { + // this is the first CP accessed, so we need to just create a major + // compaction scanner, just + // like in the compactor + if (s == null) { + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + long smallestReadPoint = store.getHRegion().getSmallestReadPoint(); + internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType, + smallestReadPoint, earliestPutTs); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Compaction scanner created for stats"); + } + InternalScanner scanner = getInternalScanner(c, store, internalScan, + store.getColumnFamilyName()); + if (scanner != null) { + internalScan = scanner; + } + } + } + } finally { + if (heldLock) { + lock.unlock(); + } + } + } + return internalScan; + } + + @Override + public void postSplit(ObserverContext<RegionCoprocessorEnvironment> ctx, HRegion l, HRegion r) throws IOException { + // Invoke collectStat here + HRegion region = ctx.getEnvironment().getRegion(); + String tableName = region.getRegionInfo().getTableNameAsString(); + if (!tableName.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) { + if (familyMap != null) { + familyMap.clear(); + } + // Create a delete operation on the parent region + // Then write the new guide posts for individual regions + // TODO : Try making this atomic + List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3); + long currentTime = TimeKeeper.SYSTEM.getCurrentTime(); + Configuration conf = ctx.getEnvironment().getConfiguration(); + if (LOG.isDebugEnabled()) { + LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo()); + } + collectStatsForSplitRegions(conf, l, region, true, mutations, currentTime); + clear(); + if (LOG.isDebugEnabled()) { + LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo()); + } + collectStatsForSplitRegions(conf, r, region, false, mutations, currentTime); + if (LOG.isDebugEnabled()) { + LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo()); + } + commitStats(mutations); + } + } + + private void collectStatsForSplitRegions(Configuration conf, HRegion daughter, HRegion parent, boolean delete, + List<Mutation> mutations, long currentTime) throws IOException { + Scan scan = createScan(conf); + RegionScanner scanner = null; + int count = 0; + try { + scanner = daughter.getScanner(scan); + count = scanRegion(scanner, count); + } catch (IOException e) { + LOG.error(e); + throw e; + } finally { + if (scanner != null) { + try { + if (delete) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting the stats for the parent region " + parent.getRegionInfo()); + } + deleteStatsFromStatsTable(parent, mutations, currentTime); + } + writeStatsToStatsTable(daughter, scanner, false, mutations, currentTime); + } catch (IOException e) { + LOG.error(e); + throw e; + } + } + } + } + + private Scan createScan(Configuration conf) { + Scan scan = new Scan(); + scan.setCaching(conf.getInt(QueryServices.SCAN_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_SCAN_CACHE_SIZE)); + // do not cache the blocks here + scan.setCacheBlocks(false); + return scan; + } + + protected InternalScanner getInternalScanner(ObserverContext<RegionCoprocessorEnvironment> c, Store store, + InternalScanner internalScan, String family) { + return new StatisticsScanner(this, stats, c.getEnvironment().getRegion().getRegionInfo(), internalScan, + Bytes.toBytes(family)); + } + + @Override + public void clear() { + this.maxMap.clear(); + this.minMap.clear(); + this.guidePostsMap.clear(); + this.familyMap.clear(); + } + + @Override + public void updateStatistic(KeyValue kv) { + byte[] cf = kv.getFamily(); + familyMap.put(new ImmutableBytesPtr(cf), true); + + String fam = Bytes.toString(cf); + byte[] row = new ImmutableBytesPtr(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()).copyBytesIfNecessary(); + if (!minMap.containsKey(fam) && !maxMap.containsKey(fam)) { + minMap.put(fam, row); + // Ideally the max key also should be added in this case + maxMap.put(fam, row); + } else { + if (Bytes.compareTo(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), minMap.get(fam), 0, + minMap.get(fam).length) < 0) { + minMap.put(fam, row); + } + if (Bytes.compareTo(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), maxMap.get(fam), 0, + maxMap.get(fam).length) > 0) { + maxMap.put(fam, row); + } + } + byteCount += kv.getLength(); + // TODO : This can be moved to an interface so that we could collect guide posts in different ways + if (byteCount >= guidepostDepth) { + if (guidePostsMap.get(fam) != null) { + guidePostsMap.get(fam).add(row); + } else { + List<byte[]> guidePosts = new ArrayList<byte[]>(); + guidePosts.add(row); + guidePostsMap.put(fam, guidePosts); + } + // reset the count for the next key + byteCount = 0; + } + } + + @Override + public byte[] getMaxKey(String fam) { + if (maxMap.get(fam) != null) { return maxMap.get(fam); } + return null; + } + + @Override + public byte[] getMinKey(String fam) { + if (minMap.get(fam) != null) { return minMap.get(fam); } + return null; + } + + @Override + public byte[] getGuidePosts(String fam) { + if (!guidePostsMap.isEmpty()) { + List<byte[]> guidePosts = guidePostsMap.get(fam); + if (guidePosts != null) { + byte[][] array = new byte[guidePosts.size()][]; + int i = 0; + for (byte[] element : guidePosts) { + array[i] = element; + i++; + } + PhoenixArray phoenixArray = new PhoenixArray(PDataType.VARBINARY, array); + return PDataType.VARBINARY_ARRAY.toBytes(phoenixArray); + } + } + return null; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, long version, int clientMethodsHashCode) + throws IOException { + return new ProtocolSignature(BaseEndpointCoprocessor.VERSION, null); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return BaseEndpointCoprocessor.VERSION; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollectorProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollectorProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollectorProtocol.java new file mode 100644 index 0000000..ad12883 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollectorProtocol.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stat; + +import java.io.IOException; + +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.phoenix.query.KeyRange; + +public interface StatisticsCollectorProtocol extends CoprocessorProtocol { + + public StatisticsCollectorResponse collectStat(final KeyRange keyRange) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollectorResponse.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollectorResponse.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollectorResponse.java new file mode 100644 index 0000000..adbffb0 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollectorResponse.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stat; + +import java.io.IOException; +import java.io.Serializable; + +public class StatisticsCollectorResponse implements Serializable { + private static final long serialVersionUID = -8192337710525997237L; + private long rowsScanned; + private IOException ioException; + + public StatisticsCollectorResponse() { + + } + + public void setRowsScanned(long rowScanned) { + this.rowsScanned = rowScanned; + } + + public long getRowsScanned() { + return rowsScanned; + } + + public void setIoException(IOException ioException) { + this.ioException = ioException; + } + + public IOException getIoException() { + return ioException; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java new file mode 100644 index 0000000..34e52b8 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the License. + */ +package org.apache.phoenix.schema.stat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TimeKeeper; + +/** + * The scanner that does the scanning to collect the stats during major compaction.{@link StatisticsCollector} + */ +public class StatisticsScanner implements InternalScanner { + private static final Log LOG = LogFactory.getLog(StatisticsScanner.class); + private InternalScanner delegate; + private StatisticsTable stats; + private HRegionInfo region; + private StatisticsTracker tracker; + private byte[] family; + + public StatisticsScanner(StatisticsTracker tracker, StatisticsTable stats, HRegionInfo region, + InternalScanner delegate, byte[] family) { + // should there be only one tracker? + this.tracker = tracker; + this.stats = stats; + this.delegate = delegate; + this.region = region; + this.family = family; + this.tracker.clear(); + } + + @Override + public boolean next(List<KeyValue> result) throws IOException { + boolean ret = delegate.next(result); + updateStat(result); + return ret; + } + + @Override + public boolean next(List<KeyValue> result, int limit) throws IOException { + boolean ret = delegate.next(result, limit); + updateStat(result); + return ret; + } + + /** + * Update the current statistics based on the lastest batch of key-values from the underlying scanner + * + * @param results + * next batch of {@link KeyValue}s + */ + protected void updateStat(final List<KeyValue> results) { + for (KeyValue kv : results) { + if (kv.getType() == KeyValue.Type.Put.getCode()) { + tracker.updateStatistic(kv); + } + } + } + + @Override + public void close() throws IOException { + try { + // update the statistics table + // Just verify if this if fine + String tableName = SchemaUtil.getTableNameFromFullName(region.getTableNameAsString()); + ArrayList<Mutation> mutations = new ArrayList<Mutation>(); + long currentTime = TimeKeeper.SYSTEM.getCurrentTime(); + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString() + + " as part of major compaction"); + } + stats.deleteStats(tableName, region.getRegionNameAsString(), this.tracker, Bytes.toString(family), + mutations, currentTime); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding new stats for the region " + region.getRegionNameAsString() + + " as part of major compaction"); + } + stats.addStats(tableName, region.getRegionNameAsString(), this.tracker, Bytes.toString(family), mutations, + currentTime); + if (LOG.isDebugEnabled()) { + LOG.debug("Committing new stats for the region " + region.getRegionNameAsString() + + " as part of major compaction"); + } + stats.commitStats(mutations); + } catch (IOException e) { + LOG.error("Failed to update statistics table!", e); + } + // close the delegate scanner + try { + delegate.close(); + } catch (IOException e) { + LOG.error("Error while closing the scanner"); + // TODO : We should throw the exception + } + } + + @Override + public boolean next(List<KeyValue> result, String metric) throws IOException { + boolean ret = delegate.next(result, metric); + updateStat(result); + return ret; + } + + @Override + public boolean next(List<KeyValue> result, int limit, String metric) throws IOException { + boolean ret = delegate.next(result, limit, metric); + updateStat(result); + return ret; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java new file mode 100644 index 0000000..8e30176 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stat; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.HTablePool; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PDataType; + +/** + * Wrapper to access the statistics table SYSTEM.STATS using the HTable. + */ +@SuppressWarnings("deprecation") +public class StatisticsTable implements Closeable { + /** Map of the currently open statistics tables */ + private static final Map<String, StatisticsTable> tableMap = new HashMap<String, StatisticsTable>(); + /** + * @param env + * Environment wherein the coprocessor is attempting to update the stats table. + * @param primaryTableName + * name of the primary table on which we should collect stats + * @return the {@link StatisticsTable} for the given primary table. + * @throws IOException + * if the table cannot be created due to an underlying HTable creation error + */ + public synchronized static StatisticsTable getStatisticsTableForCoprocessor(CoprocessorEnvironment env, + byte[] primaryTableName) throws IOException { + StatisticsTable table = tableMap.get(primaryTableName); + if (table == null) { + // Map the statics table and the table with which the statistics is + // associated. This is a workaround + HTablePool pool = new HTablePool(env.getConfiguration(), 1); + try { + HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME); + table = new StatisticsTable(hTable, primaryTableName); + tableMap.put(Bytes.toString(primaryTableName), table); + } finally { + pool.close(); + } + } + return table; + } + + private final HTableInterface statisticsTable; + private final byte[] sourceTableName; + + private StatisticsTable(HTableInterface statsTable, byte[] sourceTableName) { + this.statisticsTable = statsTable; + this.sourceTableName = sourceTableName; + } + + public StatisticsTable(Configuration conf, HTableDescriptor source) throws IOException { + this(new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME), source.getName()); + } + + /** + * Close the connection to the table + */ + @Override + public void close() throws IOException { + statisticsTable.close(); + } + + /** + * Update a list of statistics for a given region. If the ANALYZE <tablename> query is issued + * then we use Upsert queries to update the table + * If the region gets splitted or the major compaction happens we update using HTable.put() + * @param tablekey - The table name + * @param schemaName - the schema name associated with the table + * @param region name - the region of the table for which the stats are collected + * @param tracker - the statistics tracker + * @param fam - the family for which the stats is getting collected. + * @param split - if the updation is caused due to a split + * @param mutations - list of mutations that collects all the mutations to commit in a batch + * @param currentTime - the current time + * @throws IOException + * if we fail to do any of the puts. Any single failure will prevent any future attempts for the remaining list of stats to + * update + */ + public void addStats(String tableName, String regionName, StatisticsTracker tracker, String fam, + List<Mutation> mutations, long currentTime) throws IOException { + if (tracker == null) { return; } + + // Add the timestamp header + formLastUpdatedStatsMutation(tableName, currentTime, mutations); + + byte[] prefix = StatisticsUtils.getRowKey(PDataType.VARCHAR.toBytes(tableName), PDataType.VARCHAR.toBytes(fam), + PDataType.VARCHAR.toBytes(regionName)); + formStatsUpdateMutation(tracker, fam, mutations, currentTime, prefix); + } + + public void commitStats(List<Mutation> mutations) throws IOException { + Object[] res = new Object[mutations.size()]; + try { + statisticsTable.batch(mutations, res); + } catch (InterruptedException e) { + throw new IOException("Exception while adding deletes and puts"); + } + } + + private void formStatsUpdateMutation(StatisticsTracker tracker, String fam, List<Mutation> mutations, + long currentTime, byte[] prefix) { + Put put = new Put(prefix, currentTime); + if (tracker.getGuidePosts(fam) != null) { + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES, + currentTime, (tracker.getGuidePosts(fam))); + } + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_KEY_BYTES, + currentTime, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam))); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES, + currentTime, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam))); + mutations.add(put); + } + + private void formLastUpdatedStatsMutation(String tableName, long currentTime, List<Mutation> mutations) throws IOException { + byte[] prefix = StatisticsUtils.getRowKeyForTSUpdate(PDataType.VARCHAR.toBytes(tableName)); + Put put = new Put(prefix); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, currentTime, + PDataType.DATE.toBytes(new Date(currentTime))); + mutations.add(put); + } + + public void deleteStats(String tableName, String regionName, StatisticsTracker tracker, String fam, + List<Mutation> mutations, long currentTime) + throws IOException { + byte[] prefix = StatisticsUtils.getRowKey(PDataType.VARCHAR.toBytes(tableName), PDataType.VARCHAR.toBytes(fam), + PDataType.VARCHAR.toBytes(regionName)); + mutations.add(new Delete(prefix, currentTime - 1)); + } + + /** + * @return the underlying {@link HTableInterface} to which this table is writing + */ + HTableInterface getUnderlyingTable() { + return statisticsTable; + } + + byte[] getSourceTableName() { + return this.sourceTableName; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java new file mode 100644 index 0000000..e1754f3 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stat; + +import org.apache.hadoop.hbase.KeyValue; + +/** + * Track a statistic for the column on a given region + */ +public interface StatisticsTracker { + + /** + * Reset the statistic after the completion of the compaction + */ + public void clear(); + + /** + * Update the current statistics with the next {@link KeyValue} to be written + * + * @param kv + * next {@link KeyValue} to be written. + */ + public void updateStatistic(KeyValue kv); + + /** + * Return the max key of the family + * @param fam + * @return + */ + public byte[] getMaxKey(String fam); + + /** + * Return the min key of the family + * + * @param fam + * @return + */ + public byte[] getMinKey(String fam); + + /** + * Return the guide posts of the family + * + * @param fam + * @return + */ + public byte[] getGuidePosts(String fam); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java new file mode 100644 index 0000000..b89c13b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stat; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.util.TrustedByteArrayOutputStream; +/** + * Simple utility class for managing multiple key parts of the statistic + */ +public class StatisticsUtils { + + private StatisticsUtils() { + // private ctor for utility classes + } + + /** Number of parts in our complex key */ + protected static final int NUM_KEY_PARTS = 3; + + public static byte[] getRowKey(byte[] table, byte[] fam, byte[] region) throws IOException { + // always starts with the source table + TrustedByteArrayOutputStream os = new TrustedByteArrayOutputStream(table.length + region.length + + fam.length + (NUM_KEY_PARTS - 1)); + os.write(table); + os.write(QueryConstants.SEPARATOR_BYTE_ARRAY); + os.write(fam); + os.write(QueryConstants.SEPARATOR_BYTE_ARRAY); + os.write(region); + os.close(); + return os.getBuffer(); + } + + public static byte[] getRowKeyForTSUpdate(byte[] table) throws IOException { + // always starts with the source table + TrustedByteArrayOutputStream os = new TrustedByteArrayOutputStream(table.length); + os.write(table); + os.close(); + return os.getBuffer(); + } + + public static byte[] getCFFromRowKey(byte[] table, byte[] row) { + // Move over the the sepeartor byte that would be written after the table name + int startOff = indexOf(row, table) + (table.length) + 1; + int endOff = startOff; + while (endOff < row.length) { + // Check for next seperator byte + if (row[endOff] != QueryConstants.SEPARATOR_BYTE) { + endOff++; + } else { + break; + } + } + int cfLength = endOff - startOff; + byte[] cf = new byte[cfLength]; + System.arraycopy(row, startOff, cf, 0, cfLength); + return cf; + } + + public static byte[] copyRow(KeyValue kv) { + return Arrays.copyOfRange(kv.getBuffer(), kv.getRowOffset(), kv.getRowOffset() + kv.getRowLength()); + } + + /** + * Copied from org.apache.hadoop.hbase.util.Bytes.java + * @param array + * @param target + * @return + */ + public static int indexOf(byte[] array, byte[] target) { + checkNotNull(array, "array"); + checkNotNull(target, "target"); + if (target.length == 0) { + return 0; + } + + outer: + for (int i = 0; i < array.length - target.length + 1; i++) { + for (int j = 0; j < target.length; j++) { + if (array[i + j] != target[j]) { + continue outer; + } + } + return i; + } + return -1; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 62d8c47..050d889 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -1286,13 +1286,12 @@ public abstract class BaseTest { try { HTableDescriptor[] tables = admin.listTables(); for (HTableDescriptor table : tables) { - boolean isCatalogTable = (Bytes.compareTo(table.getName(), PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES) == 0); - boolean isSequenceTable = (Bytes.compareTo(table.getName(), PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES) == 0); - if (!isCatalogTable && !isSequenceTable) { + String schemaName = SchemaUtil.getSchemaNameFromFullName(table.getName()); + if (!QueryConstants.SYSTEM_SCHEMA_NAME.equals(schemaName)) { admin.disableTable(table.getName()); admin.deleteTable(table.getName()); } - } + } } finally { admin.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index 47f5b1b..a237e9e 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -32,7 +32,7 @@ import org.apache.phoenix.util.ReadOnlyProps; */ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { - private static final int DEFAULT_THREAD_POOL_SIZE = 8; + private static final int DEFAULT_THREAD_POOL_SIZE = 16; private static final int DEFAULT_QUEUE_SIZE = 0; // TODO: setting this down to 5mb causes insufficient memory exceptions. Need to investigate why private static final int DEFAULT_MAX_MEMORY_PERC = 30; // 30% of heap @@ -42,8 +42,6 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { private static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100; private static final int DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS = 60000 * 60; // 1HR (to prevent age-out of hash cache during debugging) private static final long DEFAULT_MAX_HASH_CACHE_SIZE = 1024*1024*10; // 10 Mb - private static final int DEFAULT_TARGET_QUERY_CONCURRENCY = 4; - private static final int DEFAULT_MAX_QUERY_CONCURRENCY = 8; private static final boolean DEFAULT_DROP_METADATA = false; private static final int DEFAULT_MASTER_INFO_PORT = -1; @@ -53,6 +51,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { private static final String DEFAULT_WAL_EDIT_CODEC = IndexedWALEditCodec.class.getName(); public static final long DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE = 1024L*1024L*4L; // 4 Mb public static final long DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE = 1024L*1024L*2L; // 2 Mb + public static final long DEFAULT_HISTOGRAM_BYTE_DEPTH = 20; public QueryServicesTestImpl(ReadOnlyProps defaultProps) { this(defaultProps, ReadOnlyProps.EMPTY_PROPS); @@ -60,6 +59,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { private static QueryServicesOptions getDefaultServicesOptions() { return withDefaults() + .setHistogramByteDepth(DEFAULT_HISTOGRAM_BYTE_DEPTH) .setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE) .setQueueSize(DEFAULT_QUEUE_SIZE) .setMaxMemoryPerc(DEFAULT_MAX_MEMORY_PERC) @@ -69,8 +69,6 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { .setMaxMemoryWaitMs(DEFAULT_MAX_MEMORY_WAIT_MS) .setMaxTenantMemoryPerc(DEFAULT_MAX_TENANT_MEMORY_PERC) .setMaxServerCacheSize(DEFAULT_MAX_HASH_CACHE_SIZE) - .setTargetQueryConcurrency(DEFAULT_TARGET_QUERY_CONCURRENCY) - .setMaxQueryConcurrency(DEFAULT_MAX_QUERY_CONCURRENCY) .setRowKeyOrderSaltedTable(true) .setMaxServerCacheTTLMs(DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS) .setMasterInfoPort(DEFAULT_MASTER_INFO_PORT) http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d2d30f0..cfc0cba 100644 --- a/pom.xml +++ b/pom.xml @@ -114,7 +114,7 @@ <!-- Plugin options --> <numForkedUT>3</numForkedUT> - <numForkedIT>7</numForkedIT> + <numForkedIT>4</numForkedIT> <!-- Set default encoding so multi-byte tests work correctly on the Mac --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
