PHOENIX-4785 Unable to write to table if index is made active during retry
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3da21d5e Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3da21d5e Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3da21d5e Branch: refs/heads/4.x-cdh5.15 Commit: 3da21d5e45b0aceb683eb01867bbe3a8fef0abb1 Parents: d50f3e3 Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Fri Jun 22 00:13:50 2018 +0100 Committer: Pedro Boado <pbo...@apache.org> Committed: Wed Oct 17 21:26:17 2018 +0100 ---------------------------------------------------------------------- .../end2end/index/MutableIndexFailureIT.java | 128 ++++++++++++++----- .../MutableIndexFailureWithNamespaceIT.java | 80 ++++++++++++ .../coprocessor/MetaDataEndpointImpl.java | 30 +++++ .../index/PhoenixIndexFailurePolicy.java | 71 +++++++++- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 2 +- 5 files changed, 276 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/3da21d5e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index 715e37f..aac20ac 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -28,10 +28,16 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -104,10 +110,10 @@ public class MutableIndexFailureIT extends BaseTest { private final boolean throwIndexWriteFailure; private String schema = generateUniqueName(); private List<CommitException> exceptions = Lists.newArrayList(); - private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment; - private static final int forwardOverlapMs = 1000; - private static final int disableTimestampThresholdMs = 10000; - private static final int numRpcRetries = 2; + protected static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment; + protected static final int forwardOverlapMs = 1000; + protected static final int disableTimestampThresholdMs = 10000; + protected static final int numRpcRetries = 2; public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, boolean failRebuildTask, Boolean throwIndexWriteFailure) { this.transactional = transactional; @@ -127,6 +133,23 @@ public class MutableIndexFailureIT extends BaseTest { @BeforeClass public static void doSetup() throws Exception { + Map<String, String> serverProps = getServerProps(); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2); + clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); + NUM_SLAVES_BASE = 4; + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + indexRebuildTaskRegionEnvironment = + (RegionCoprocessorEnvironment) getUtility() + .getRSForFirstRegionInTable( + PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) + .getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) + .get(0).getCoprocessorHost() + .findCoprocessorEnvironment(MetaDataRegionObserver.class.getName()); + MetaDataRegionObserver.initRebuildIndexConnectionProps( + indexRebuildTaskRegionEnvironment.getConfiguration()); + } + + protected static Map<String,String> getServerProps(){ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10); serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName()); serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000"); @@ -142,19 +165,7 @@ public class MutableIndexFailureIT extends BaseTest { * because we want to control it's execution ourselves */ serverProps.put(QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, Long.toString(Long.MAX_VALUE)); - Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2); - clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); - NUM_SLAVES_BASE = 4; - setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); - indexRebuildTaskRegionEnvironment = - (RegionCoprocessorEnvironment) getUtility() - .getRSForFirstRegionInTable( - PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) - .getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) - .get(0).getCoprocessorHost() - .findCoprocessorEnvironment(MetaDataRegionObserver.class.getName()); - MetaDataRegionObserver.initRebuildIndexConnectionProps( - indexRebuildTaskRegionEnvironment.getConfiguration()); + return serverProps; } @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},failRebuildTask={4},throwIndexWriteFailure={5}") // name is used by failsafe as file name in reports @@ -162,16 +173,10 @@ public class MutableIndexFailureIT extends BaseTest { return Arrays.asList(new Object[][] { // note - can't disableIndexOnWriteFailure without throwIndexWriteFailure, PHOENIX-4130 { false, false, false, false, false, false}, - { false, false, true, true, false, null}, - { false, false, true, true, false, true}, { false, false, false, true, false, null}, { true, false, false, true, false, null}, - { true, false, true, true, false, null}, - { false, true, true, true, false, null}, { false, true, false, null, false, null}, { true, true, false, true, false, null}, - { true, true, true, null, false, null}, - { false, false, false, false, false, null}, { false, true, false, false, false, null}, { false, false, false, false, false, null}, @@ -180,9 +185,7 @@ public class MutableIndexFailureIT extends BaseTest { { false, true, false, true, false, null}, { false, true, false, true, false, null}, { false, false, false, true, true, null}, - { false, false, true, true, true, null}, { false, false, false, false, true, false}, - { false, false, true, false, true, false}, } ); } @@ -258,6 +261,9 @@ public class MutableIndexFailureIT extends BaseTest { rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); + initializeTable(conn, fullTableName); + addRowsInTableDuringRetry(fullTableName); + // Verify the metadata for index is correct. rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), null, new String[] { PTableType.INDEX.toString() }); @@ -270,8 +276,9 @@ public class MutableIndexFailureIT extends BaseTest { assertTrue(rs.next()); assertEquals(thirdIndexName, rs.getString(3)); assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); - initializeTable(conn, fullTableName); - + // we should be able to write to ACTIVE index even in case of disable index on failure policy + addRowToTable(conn, fullTableName); + query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName; rs = conn.createStatement().executeQuery("EXPLAIN " + query); String expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER " @@ -402,16 +409,67 @@ public class MutableIndexFailureIT extends BaseTest { stmt.setString(2, "x"); stmt.setString(3, "1"); stmt.execute(); - stmt.setString(1, "b"); - stmt.setString(2, "y"); - stmt.setString(3, "2"); - stmt.execute(); + conn.commit(); + } + + private void addRowToTable(Connection conn, String tableName) throws SQLException { + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)"); stmt.setString(1, "c"); stmt.setString(2, "z"); stmt.setString(3, "3"); stmt.execute(); conn.commit(); + } + private void addRowsInTableDuringRetry(final String tableName) + throws SQLException, InterruptedException, ExecutionException { + int threads=10; + boolean wasFailWrite = FailingRegionObserver.FAIL_WRITE; + boolean wasToggleFailWriteForRetry = FailingRegionObserver.TOGGLE_FAIL_WRITE_FOR_RETRY; + try { + Callable callable = new Callable() { + + @Override + public Boolean call() { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, String.valueOf(isNamespaceMapped)); + try (Connection conn = driver.connect(url, props)) { + // In case of disable index on failure policy, INDEX will be in PENDING_DISABLE on first retry + // but will + // become active if retry is successfull + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)"); + stmt.setString(1, "b"); + stmt.setString(2, "y"); + stmt.setString(3, "2"); + stmt.execute(); + if (!leaveIndexActiveOnFailure && !transactional) { + FailingRegionObserver.FAIL_WRITE = true; + FailingRegionObserver.TOGGLE_FAIL_WRITE_FOR_RETRY = true; + } + conn.commit(); + } catch (SQLException e) { + return false; + } + return true; + } + }; + ExecutorService executor = Executors.newFixedThreadPool(threads); + List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(); + for (int i = 0; i < threads; i++) { + futures.add(executor.submit(callable)); + } + for (Future<Boolean> future : futures) { + Boolean isSuccess = future.get(); + // transactions can have conflict so ignoring the check for them + if (!transactional) { + assertTrue(isSuccess); + } + } + executor.shutdown(); + } finally { + FailingRegionObserver.FAIL_WRITE = wasFailWrite; + FailingRegionObserver.TOGGLE_FAIL_WRITE_FOR_RETRY = wasToggleFailWriteForRetry; + } } private void validateDataWithIndex(Connection conn, String fullTableName, String fullIndexName, boolean localIndex) throws SQLException { @@ -510,6 +568,7 @@ public class MutableIndexFailureIT extends BaseTest { } public static class FailingRegionObserver extends SimpleRegionObserver { + public static boolean TOGGLE_FAIL_WRITE_FOR_RETRY = false; public static volatile boolean FAIL_WRITE = false; public static final String FAIL_INDEX_NAME = "FAIL_IDX"; public static final String FAIL_TABLE_NAME = "FAIL_TABLE"; @@ -520,6 +579,9 @@ public class MutableIndexFailureIT extends BaseTest { if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" + FAIL_INDEX_NAME) && FAIL_WRITE) { throwException = true; + if (TOGGLE_FAIL_WRITE_FOR_RETRY) { + FAIL_WRITE = !FAIL_WRITE; + } } else { // When local index updates are atomic with data updates, testing a write failure to a local // index won't make sense. @@ -542,7 +604,9 @@ public class MutableIndexFailureIT extends BaseTest { } } if (throwException) { - dropIndex(c); + if (!TOGGLE_FAIL_WRITE_FOR_RETRY) { + dropIndex(c); + } throw new DoNotRetryIOException(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3da21d5e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java new file mode 100644 index 0000000..5ed9e1f --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.coprocessor.MetaDataRegionObserver; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.junit.BeforeClass; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Maps; + +/* + * This class is to ensure gets its own cluster with Namespace Enabled + */ +public class MutableIndexFailureWithNamespaceIT extends MutableIndexFailureIT { + + public MutableIndexFailureWithNamespaceIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped, + Boolean disableIndexOnWriteFailure, boolean failRebuildTask, Boolean throwIndexWriteFailure) { + super(transactional, localIndex, isNamespaceMapped, disableIndexOnWriteFailure, failRebuildTask, + throwIndexWriteFailure); + } + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> serverProps = getServerProps(); + serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString()); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2); + clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString()); + clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); + NUM_SLAVES_BASE = 4; + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + TableName systemTable = SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, + true); + indexRebuildTaskRegionEnvironment = (RegionCoprocessorEnvironment)getUtility() + .getRSForFirstRegionInTable(systemTable).getOnlineRegions(systemTable).get(0).getCoprocessorHost() + .findCoprocessorEnvironment(MetaDataRegionObserver.class.getName()); + MetaDataRegionObserver.initRebuildIndexConnectionProps(indexRebuildTaskRegionEnvironment.getConfiguration()); + } + + @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},failRebuildTask={4},throwIndexWriteFailure={5}") // name is used by failsafe as file name in reports + public static List<Object[]> data() { + return Arrays.asList(new Object[][] { + // note - can't disableIndexOnWriteFailure without throwIndexWriteFailure, PHOENIX-4130 + { false, false, true, true, false, null}, + { false, false, true, true, false, true}, + { true, false, true, true, false, null}, + { false, true, true, true, false, null}, + { true, true, true, null, false, null}, + { false, false, true, true, true, null}, + { false, false, true, false, true, false}, + } + ); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/3da21d5e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 5e2e4df..ae2fa66 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -3871,7 +3871,37 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return; } } + if (newState == PIndexState.PENDING_DISABLE && currentState != PIndexState.PENDING_DISABLE) { + // reset count for first PENDING_DISABLE + newKVs.add(KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, timeStamp, Bytes.toBytes(0L))); + } + if (currentState == PIndexState.PENDING_DISABLE) { + if (newState == PIndexState.ACTIVE) { + //before making index ACTIVE check if all clients succeed otherwise keep it PENDING_DISABLE + byte[] count = region + .get(new Get(key).addColumn(TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES)) + .getValue(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES); + if (count != null && Bytes.toLong(count) != 0) { + newState = PIndexState.PENDING_DISABLE; + newKVs.remove(disableTimeStampKVIndex); + newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES, + INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue()))); + } + } else if (newState == PIndexState.DISABLE) { + //reset the counter for pending disable when transitioning from PENDING_DISABLE to DISABLE + newKVs.add(KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, timeStamp, Bytes.toBytes(0L))); + } + } + + if(newState == PIndexState.ACTIVE||newState == PIndexState.PENDING_ACTIVE||newState == PIndexState.DISABLE){ + newKVs.add(KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, timeStamp, Bytes.toBytes(0L))); + } + if (currentState == PIndexState.BUILDING && newState != PIndexState.ACTIVE) { timeStamp = currentStateKV.getTimestamp(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3da21d5e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index 55192e7..e7f5ac2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.index; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES; + import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.sql.SQLException; @@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -441,6 +444,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { public static void doBatchWithRetries(MutateCommand mutateCommand, IndexWriteException iwe, PhoenixConnection connection, ReadOnlyProps config) throws IOException { + incrementPendingDisableCounter(iwe, connection); int maxTries = config.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); long pause = config.getLong(HConstants.HBASE_CLIENT_PAUSE, @@ -475,6 +479,57 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { throw new DoNotRetryIOException(iwe); // send failure back to client } + private static void incrementPendingDisableCounter(IndexWriteException indexWriteException,PhoenixConnection conn) { + try { + Set<String> indexesToUpdate = new HashSet<>(); + if (indexWriteException instanceof MultiIndexWriteFailureException) { + MultiIndexWriteFailureException indexException = + (MultiIndexWriteFailureException) indexWriteException; + List<HTableInterfaceReference> failedIndexes = indexException.getFailedTables(); + if (indexException.isDisableIndexOnFailure() && failedIndexes != null) { + for (HTableInterfaceReference failedIndex : failedIndexes) { + String failedIndexTable = failedIndex.getTableName(); + if (!indexesToUpdate.contains(failedIndexTable)) { + incrementCounterForIndex(conn,failedIndexTable); + indexesToUpdate.add(failedIndexTable); + } + } + } + } else if (indexWriteException instanceof SingleIndexWriteFailureException) { + SingleIndexWriteFailureException indexException = + (SingleIndexWriteFailureException) indexWriteException; + String failedIndex = indexException.getTableName(); + if (indexException.isDisableIndexOnFailure() && failedIndex != null) { + incrementCounterForIndex(conn,failedIndex); + } + } + } catch (Exception handleE) { + LOG.warn("Error while trying to handle index write exception", indexWriteException); + } + } + + private static void incrementCounterForIndex(PhoenixConnection conn, String failedIndexTable) throws IOException { + incrementCounterForIndex(conn, failedIndexTable, 1); + } + + private static void decrementCounterForIndex(PhoenixConnection conn, String failedIndexTable) throws IOException { + incrementCounterForIndex(conn, failedIndexTable, -1); + } + + private static void incrementCounterForIndex(PhoenixConnection conn, String failedIndexTable,long amount) throws IOException { + byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(failedIndexTable); + Increment incr = new Increment(indexTableKey); + incr.addColumn(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, amount); + try { + conn.getQueryServices() + .getTable(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, + conn.getQueryServices().getProps()).getName()) + .increment(incr); + } catch (SQLException e) { + throw new IOException(e); + } + } + private static boolean canRetryMore(int numRetry, int maxRetries, long canRetryUntil) { // If there is a single try we must not take into account the time. return numRetry < maxRetries @@ -497,13 +552,25 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { } private static void updateIndex(String indexFullName, PhoenixConnection conn, - PIndexState indexState) throws SQLException { + PIndexState indexState) throws SQLException, IOException { + //Decrement the counter because we will be here when client give retry after getting failed or succeed + decrementCounterForIndex(conn,indexFullName); + Long indexDisableTimestamp = null; if (PIndexState.DISABLE.equals(indexState)) { LOG.info("Disabling index after hitting max number of index write retries: " + indexFullName); + IndexUtil.updateIndexState(conn, indexFullName, indexState, indexDisableTimestamp); } else if (PIndexState.ACTIVE.equals(indexState)) { LOG.debug("Resetting index to active after subsequent success " + indexFullName); + //At server disabled timestamp will be reset only if there is no other client is in PENDING_DISABLE state + indexDisableTimestamp = 0L; + try { + IndexUtil.updateIndexState(conn, indexFullName, indexState, indexDisableTimestamp); + } catch (SQLException e) { + // It's possible that some other client had made the Index DISABLED already , so we can ignore unallowed + // transition(DISABLED->ACTIVE) + if (e.getErrorCode() != SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION.getErrorCode()) { throw e; } + } } - IndexUtil.updateIndexState(conn, indexFullName, indexState, null); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/3da21d5e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 5bcd286..8dd4a88 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -49,7 +49,6 @@ import org.apache.phoenix.expression.function.SQLViewTypeFunction; import org.apache.phoenix.expression.function.SqlTypeNameFunction; import org.apache.phoenix.expression.function.TransactionProviderNameFunction; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.iterate.DelegateResultIterator; import org.apache.phoenix.iterate.MaterializedResultIterator; import org.apache.phoenix.iterate.ResultIterator; @@ -214,6 +213,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final String TABLE_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY; public static final byte[] TABLE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; + public static final byte[] PENDING_DISABLE_COUNT_BYTES = Bytes.toBytes("PENDING_DISABLE_COUNT"); public static final String TYPE_SEQUENCE = "SEQUENCE"; public static final String SYSTEM_FUNCTION_TABLE = "FUNCTION";