Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.0 d830ddfc6 -> aafa07962
PHOENIX-2926 Skip loading data for table having local indexes when there is split during bulkload job-addendum(Rajeshbabu) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/aafa0796 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/aafa0796 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/aafa0796 Branch: refs/heads/4.x-HBase-1.0 Commit: aafa07962d9f129ad0f04f147f91c454b854a3da Parents: d830ddf Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Fri Jul 29 00:07:37 2016 +0530 Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Committed: Fri Jul 29 00:07:37 2016 +0530 ---------------------------------------------------------------------- .../phoenix/mapreduce/AbstractBulkLoadTool.java | 32 ++++++-------------- .../phoenix/mapreduce/index/IndexTool.java | 20 ++++++++++-- .../java/org/apache/phoenix/util/IndexUtil.java | 14 +++++++++ 3 files changed, 42 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/aafa0796/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index a41de8a..f7b7d22 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.mapreduce; +import java.io.IOException; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -59,6 +60,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; @@ -309,33 +311,19 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { if (success) { if (hasLocalIndexes) { - byte[][] splitKeysAfterJob = null; try { table = new HTable(job.getConfiguration(), qualifiedTableName); - splitKeysAfterJob = table.getStartKeys(); + if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob, table.getStartKeys())) { + LOG.error("The table " + + qualifiedTableName + + " has local indexes and there is split key mismatch before and" + + " after running bulkload job. Please rerun the job otherwise" + + " there may be inconsistencies between actual data and index data."); + return -1; + } } finally { if (table != null) table.close(); } - boolean matchingSplitKeys = true; - if (splitKeysBeforeJob != null && splitKeysAfterJob != null - && splitKeysBeforeJob.length == splitKeysAfterJob.length) { - for (int i = 0; i < splitKeysBeforeJob.length; i++) { - if (Bytes.compareTo(splitKeysBeforeJob[i], splitKeysAfterJob[i]) != 0) { - matchingSplitKeys = false; - break; - } - } - } else { - matchingSplitKeys = false; - } - if(!matchingSplitKeys) { - LOG.error("The table " - + qualifiedTableName - + " has local indexes and there is split key mismatch before and" - + " after running bulkload job. Please rerun the job otherwise" - + " there may be inconsistencies between actual data and index data."); - return -1; - } } LOG.info("Loading HFiles from {}", outputPath); completebulkload(conf,outputPath,tablesToBeLoaded); http://git-wip-us.apache.org/repos/asf/phoenix/blob/aafa0796/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 34c9013..8488123 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -63,6 +63,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; @@ -205,8 +206,10 @@ public class IndexTool extends Configured implements Tool { // check if the index type is LOCAL, if so, derive and set the physicalIndexName that is // computed from the qDataTable name. String physicalIndexTable = pindexTable.getPhysicalName().getString(); + boolean isLocalIndexBuild = false; if (IndexType.LOCAL.equals(pindexTable.getIndexType())) { physicalIndexTable = qDataTable; + isLocalIndexBuild = true; } final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class); @@ -247,7 +250,7 @@ public class IndexTool extends Configured implements Tool { configureSubmittableJobUsingDirectApi(job, outputPath, cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt())); } else { - configureRunnableJobUsingBulkLoad(job, outputPath); + configureRunnableJobUsingBulkLoad(job, outputPath, isLocalIndexBuild); // Without direct API, we need to update the index state to ACTIVE from client. IndexToolUtil.updateIndexState(connection, qDataTable, indexTable, PIndexState.ACTIVE); @@ -276,7 +279,7 @@ public class IndexTool extends Configured implements Tool { * @return * @throws Exception */ - private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath) throws Exception { + private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath, boolean isLocalIndexBuild) throws Exception { job.setMapperClass(PhoenixIndexImportMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); @@ -285,11 +288,24 @@ public class IndexTool extends Configured implements Tool { PhoenixConfigurationUtil.getPhysicalTableName(configuration); final HTable htable = new HTable(configuration, physicalIndexTable); HFileOutputFormat.configureIncrementalLoad(job, htable); + byte[][] splitKeysBeforeJob = null; + if(isLocalIndexBuild) { + splitKeysBeforeJob = htable.getStartKeys(); + } boolean status = job.waitForCompletion(true); if (!status) { LOG.error("IndexTool job failed!"); htable.close(); throw new Exception("IndexTool job failed: " + job.toString()); + } else { + if (isLocalIndexBuild + && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, htable.getStartKeys())) { + String errMsg = "The index to build is local index and the split keys are not matching" + + " before and after running the job. Please rerun the job otherwise" + + " there may be inconsistencies between actual data and index data"; + LOG.error(errMsg); + throw new Exception(errMsg); + } } LOG.info("Loading HFiles from {}", outputPath); http://git-wip-us.apache.org/repos/asf/phoenix/blob/aafa0796/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 79a5275..45d9e43 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -674,4 +674,18 @@ public class IndexUtil { mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE, HConstants.NO_NONCE); } + + public static boolean matchingSplitKeys(byte[][] splitKeys1, byte[][] splitKeys2) throws IOException { + if (splitKeys1 != null && splitKeys2 != null + && splitKeys1.length == splitKeys2.length) { + for (int i = 0; i < splitKeys1.length; i++) { + if (Bytes.compareTo(splitKeys1[i], splitKeys2[i]) != 0) { + return false; + } + } + } else { + return false; + } + return true; + } }