This is an automated email from the ASF dual-hosted git repository. tdsilva pushed a commit to branch 4.14-HBase-1.2 in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit d5d7cd6b57cfcceb9ed7e11ffe3694831b55281d Author: Monani Mihir <monani.mi...@gmail.com> AuthorDate: Fri Apr 12 21:48:45 2019 +0530 PHOENIX-5194 Thread Cache is not update for Index retries in for MutationState#send()#doMutation() --- .../org/apache/phoenix/execute/MutationState.java | 45 ++++++++++++++++++---- .../phoenix/index/PhoenixIndexFailurePolicy.java | 10 ++++- 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 33cd596..1cbc4bc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -41,6 +41,7 @@ import javax.annotation.concurrent.Immutable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -220,7 +221,7 @@ public class MutationState implements SQLCloseable { * Commit a write fence when creating an index so that we can detect when a data table transaction is started before * the create index but completes after it. In this case, we need to rerun the data table transaction after the * index creation so that the index rows are generated. See TEPHRA-157 for more information. - * + * * @param dataTable * the data table upon which an index is being added * @throws SQLException @@ -445,7 +446,7 @@ public class MutationState implements SQLCloseable { /** * Combine a newer mutation with this one, where in the event of overlaps, the newer one will take precedence. * Combine any metrics collected for the newer mutation. - * + * * @param newMutationState * the newer mutation state */ @@ -500,8 +501,8 @@ public class MutationState implements SQLCloseable { final MultiRowMutationState values, final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) { final PTable table = tableRef.getTable(); - final List<PTable> indexList = includeAllIndexes ? - Lists.newArrayList(IndexMaintainer.maintainedIndexes(table.getIndexes().iterator())) : + final List<PTable> indexList = includeAllIndexes ? + Lists.newArrayList(IndexMaintainer.maintainedIndexes(table.getIndexes().iterator())) : IndexUtil.getClientMaintainedIndexes(table); final Iterator<PTable> indexes = indexList.iterator(); final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size()); @@ -648,7 +649,7 @@ public class MutationState implements SQLCloseable { /** * Get the unsorted list of HBase mutations for the tables with uncommitted data. - * + * * @return list of HBase mutations for uncommitted data. */ public Iterator<Pair<byte[], List<Mutation>>> toMutations(Long timestamp) { @@ -730,7 +731,7 @@ public class MutationState implements SQLCloseable { /** * Validates that the meta data is valid against the server meta data if we haven't yet done so. Otherwise, for * every UPSERT VALUES call, we'd need to hit the server to see if the meta data has changed. - * + * * @return the server time to use for the upsert * @throws SQLException * if the table or any columns no longer exist @@ -953,7 +954,7 @@ public class MutationState implements SQLCloseable { TableRef origTableRef = tableInfo.getOrigTableRef(); PTable table = origTableRef.getTable(); table.getIndexMaintainers(indexMetaDataPtr, connection); - final ServerCache cache = tableInfo.isDataTable() ? + final ServerCache cache = tableInfo.isDataTable() ? IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table, mutationList, indexMetaDataPtr) : null; // If we haven't retried yet, retry for this case only, as it's possible that @@ -982,7 +983,10 @@ public class MutationState implements SQLCloseable { for (final List<Mutation> mutationBatch : mutationBatchList) { if (shouldRetryIndexedMutation) { // if there was an index write failure, retry the mutation in a loop - final HTableInterface finalHTable = hTable; + final Table finalHTable = hTable; + final ImmutableBytesWritable finalindexMetaDataPtr = + indexMetaDataPtr; + final PTable finalPTable = table; PhoenixIndexFailurePolicy.doBatchWithRetries(new MutateCommand() { @Override public void doMutation() throws IOException { @@ -991,6 +995,9 @@ public class MutationState implements SQLCloseable { } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException(e); + } catch (IOException e) { + e = updateTableRegionCacheIfNecessary(e); + throw e; } } @@ -998,6 +1005,28 @@ public class MutationState implements SQLCloseable { public List<Mutation> getMutationList() { return mutationBatch; } + + private IOException + updateTableRegionCacheIfNecessary(IOException ioe) { + SQLException sqlE = + ServerUtil.parseLocalOrRemoteServerException(ioe); + if (sqlE != null + && sqlE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND + .getErrorCode()) { + try { + connection.getQueryServices().clearTableRegionCache( + finalHTable.getName().getName()); + IndexMetaDataCacheClient.setMetaDataOnMutations( + connection, finalPTable, mutationBatch, + finalindexMetaDataPtr); + } catch (SQLException e) { + return ServerUtil.createIOException( + "Exception during updating index meta data cache", + ioe); + } + } + return ioe; + } }, iwe, connection, connection.getQueryServices().getProps()); } else { hTable.batch(mutationBatch); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index 46919d5..c58b5c9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -491,8 +491,14 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { } catch (IOException e) { SQLException inferredE = ServerUtil.parseLocalOrRemoteServerException(e); if (inferredE == null || inferredE.getErrorCode() != SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) { - // if it's not an index write exception, throw exception, to be handled normally in caller's try-catch - throw e; + // If this call is from phoenix client, we also need to check if SQLException + // error is INDEX_METADATA_NOT_FOUND or not + // if it's not an INDEX_METADATA_NOT_FOUND, throw exception, + // to be handled normally in caller's try-catch + if (inferredE.getErrorCode() != SQLExceptionCode.INDEX_METADATA_NOT_FOUND + .getErrorCode()) { + throw e; + } } } catch (InterruptedException e) { Thread.currentThread().interrupt();