Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 45f0004e0 -> a67f74d58
PHOENIX-3045 Data regions in transition forever if RS holding them down during drop index Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a67f74d5 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a67f74d5 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a67f74d5 Branch: refs/heads/4.x-HBase-0.98 Commit: a67f74d58e80a19acbb81dc266bab1fffea9cfc3 Parents: 45f0004 Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Tue Jul 12 23:12:52 2016 +0530 Committer: Ankit Singhal <ankitsingha...@gmail.com> Committed: Tue Jul 12 23:12:52 2016 +0530 ---------------------------------------------------------------------- .../org/apache/phoenix/hbase/index/Indexer.java | 15 +-- .../phoenix/hbase/index/write/IndexWriter.java | 14 +- .../hbase/index/write/RecoveryIndexWriter.java | 134 +++++++++++++++++++ .../phoenix/iterate/BaseResultIterators.java | 5 +- .../stats/StatisticsCollectorFactory.java | 21 +-- .../phoenix/schema/stats/StatisticsUtil.java | 27 ++++ 6 files changed, 176 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 0aed2a6..2956470 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; @@ -63,13 +62,13 @@ import org.apache.phoenix.hbase.index.write.IndexFailurePolicy; import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache; import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy; -import org.apache.phoenix.hbase.index.write.recovery.TrackingParallelWriterIndexCommitter; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; import org.cloudera.htrace.Span; import org.cloudera.htrace.Trace; import org.cloudera.htrace.TraceScope; - +import org.apache.hadoop.hbase.KeyValue; +import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter; import com.google.common.collect.Multimap; /** @@ -152,11 +151,6 @@ public class Indexer extends BaseRegionObserver { // setup the actual index writer this.writer = new IndexWriter(env, serverName + "-index-writer"); - - // setup the recovery writer that does retries on the failed edits - TrackingParallelWriterIndexCommitter recoveryCommmiter = - new TrackingParallelWriterIndexCommitter(); - try { // get the specified failure policy. We only ever override it in tests, but we need to do it // here @@ -165,10 +159,9 @@ public class Indexer extends BaseRegionObserver { StoreFailuresInCachePolicy.class, IndexFailurePolicy.class); IndexFailurePolicy policy = policyClass.getConstructor(PerRegionIndexWriteCache.class).newInstance(failedIndexEdits); - LOG.debug("Setting up recovery writter with committer: " + recoveryCommmiter.getClass() - + " and failure policy: " + policy.getClass()); + LOG.debug("Setting up recovery writter with failure policy: " + policy.getClass()); recoveryWriter = - new IndexWriter(recoveryCommmiter, policy, env, serverName + "-recovery-writer"); + new RecoveryIndexWriter(policy, env, serverName + "-recovery-writer"); } catch (Exception ex) { throw new IOException("Could not instantiate recovery failure policy!", ex); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java index cbcec3b..831aa16 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java @@ -30,13 +30,13 @@ import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Pair; - -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; import org.apache.phoenix.hbase.index.exception.IndexWriteException; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; + /** * Do the actual work of writing to the index tables. Ensures that if we do fail to write to the * index table that we cleanly kill the region/server to ensure that the region's WAL gets replayed. @@ -171,11 +171,11 @@ public class IndexWriter implements Stoppable { write(resolveTableReferences(toWrite), false); } - public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates) throws IndexWriteException { + public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates) throws IOException { write(resolveTableReferences(toWrite), allowLocalUpdates); } - - /** + + /** * see {@link #write(Collection)} * @param toWrite * @throws IndexWriteException @@ -190,7 +190,7 @@ public class IndexWriter implements Stoppable { * @param indexUpdates from the index builder * @return pairs that can then be written by an {@link IndexWriter}. */ - public static Multimap<HTableInterfaceReference, Mutation> resolveTableReferences( + protected Multimap<HTableInterfaceReference, Mutation> resolveTableReferences( Collection<Pair<Mutation, byte[]>> indexUpdates) { Multimap<HTableInterfaceReference, Mutation> updates = ArrayListMultimap .<HTableInterfaceReference, Mutation> create(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java new file mode 100644 index 0000000..be542bb --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.write; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException; +import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.write.recovery.TrackingParallelWriterIndexCommitter; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; + +/** + * Used to recover failed index edits during WAL replay + * <p> + * We attempt to do the index updates in parallel using a backing threadpool. All threads are daemon threads, so it will + * not block the region from shutting down. + */ +public class RecoveryIndexWriter extends IndexWriter { + + private static final Log LOG = LogFactory.getLog(RecoveryIndexWriter.class); + private Set<HTableInterfaceReference> nonExistingTablesList = new HashSet<HTableInterfaceReference>(); + private HBaseAdmin admin; + + /** + * Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected to be fully setup + * before calling. + * + * @param committer + * @param policy + * @param env + * @throws IOException + * @throws ZooKeeperConnectionException + * @throws MasterNotRunningException + */ + public RecoveryIndexWriter(IndexFailurePolicy policy, RegionCoprocessorEnvironment env, String name) + throws MasterNotRunningException, ZooKeeperConnectionException, IOException { + super(new TrackingParallelWriterIndexCommitter(), policy, env, name); + this.admin = new HBaseAdmin(env.getConfiguration()); + } + + @Override + public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates) throws IOException { + try { + write(resolveTableReferences(toWrite), allowLocalUpdates); + } catch (MultiIndexWriteFailureException e) { + for (HTableInterfaceReference table : e.getFailedTables()) { + if (!admin.tableExists(table.getTableName())) { + LOG.warn("Failure due to non existing table: " + table.getTableName()); + nonExistingTablesList.add(table); + } else { + throw e; + } + } + } + } + + /** + * Convert the passed index updates to {@link HTableInterfaceReference}s. + * + * @param indexUpdates + * from the index builder + * @return pairs that can then be written by an {@link RecoveryIndexWriter}. + */ + @Override + protected Multimap<HTableInterfaceReference, Mutation> resolveTableReferences( + Collection<Pair<Mutation, byte[]>> indexUpdates) { + Multimap<HTableInterfaceReference, Mutation> updates = ArrayListMultimap + .<HTableInterfaceReference, Mutation> create(); + + // simple map to make lookups easy while we build the map of tables to create + Map<ImmutableBytesPtr, HTableInterfaceReference> tables = new HashMap<ImmutableBytesPtr, HTableInterfaceReference>( + updates.size()); + for (Pair<Mutation, byte[]> entry : indexUpdates) { + byte[] tableName = entry.getSecond(); + ImmutableBytesPtr ptr = new ImmutableBytesPtr(tableName); + HTableInterfaceReference table = tables.get(ptr); + if (nonExistingTablesList.contains(table)) { + LOG.debug("Edits found for non existing table: " + table.getTableName() + " so skipping it!!"); + continue; + } + if (table == null) { + table = new HTableInterfaceReference(ptr); + tables.put(ptr, table); + } + updates.put(table, entry.getFirst()); + + } + return updates; + } + + @Override + public void stop(String why) { + super.stop(why); + if (admin != null) { + try { + admin.close(); + } catch (IOException e) { + // closing silently + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 4a797b8..c2a97b5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -49,6 +49,7 @@ import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.PageFilter; @@ -80,11 +81,11 @@ import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.ViewType; -import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.PTableStats; +import org.apache.phoenix.schema.stats.StatisticsUtil; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.LogUtil; @@ -360,7 +361,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result if (null == currentSCN) { currentSCN = HConstants.LATEST_TIMESTAMP; } - tableStats = useStats() && table.getType() != PTableType.SYSTEM + tableStats = useStats() && StatisticsUtil.isStatsEnabled(TableName.valueOf(physicalTableName)) ? context.getConnection().getQueryServices().getTableStats(physicalTableName, currentSCN) : PTableStats.EMPTY_STATS; // Used to tie all the scans together during logging http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java index 30c560a..1c65f09 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java @@ -18,15 +18,9 @@ package org.apache.phoenix.schema.stats; import java.io.IOException; -import java.util.Set; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.SchemaUtil; - -import com.google.common.collect.Sets; /** * Provides new {@link DefaultStatisticsCollector} instances based on configuration settings for a @@ -56,19 +50,6 @@ public class StatisticsCollectorFactory { } } - // TODO: make this declarative through new DISABLE_STATS column on SYSTEM.CATALOG table. - // Also useful would be a USE_CURRENT_TIME_FOR_STATS column on SYSTEM.CATALOG table. - private static final Set<TableName> DISABLE_STATS = Sets.newHashSetWithExpectedSize(3); - static { - DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)); - DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME)); - DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME)); - DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)); - DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true)); - DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true)); - DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true)); - DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true)); - } /** * Determines if statistics are enabled (which is the default). This is done on the @@ -78,7 +59,7 @@ public class StatisticsCollectorFactory { */ private static boolean statisticsEnabled(RegionCoprocessorEnvironment env) { return env.getConfiguration().getBoolean(QueryServices.STATS_ENABLED_ATTRIB, true) && - !DISABLE_STATS.contains(env.getRegionInfo().getTable()); + StatisticsUtil.isStatsEnabled(env.getRegionInfo().getTable()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java index 5e03be5..db31b69 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -44,14 +46,33 @@ import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.Sets; /** * Simple utility class for managing multiple key parts of the statistic */ public class StatisticsUtil { + + private static final Set<TableName> DISABLE_STATS = Sets.newHashSetWithExpectedSize(8); + // TODO: make this declarative through new DISABLE_STATS column on SYSTEM.CATALOG table. + // Also useful would be a USE_CURRENT_TIME_FOR_STATS column on SYSTEM.CATALOG table. + static { + DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)); + DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME)); + DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME)); + DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)); + DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true)); + DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES,true)); + DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES,true)); + DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES,true)); + } + private StatisticsUtil() { // private ctor for utility classes } + /** Number of parts in our complex key */ protected static final int NUM_KEY_PARTS = 3; @@ -227,4 +248,10 @@ public class StatisticsUtil { } return ByteUtil.EMPTY_BYTE_ARRAY; } + + public static boolean isStatsEnabled(TableName tableName) { + return !DISABLE_STATS.contains(tableName); + } + + } \ No newline at end of file