Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.0 d830ddfc6 -> aafa07962


PHOENIX-2926 Skip loading data for table having local indexes when there is 
split during bulkload job-addendum(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/aafa0796
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/aafa0796
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/aafa0796

Branch: refs/heads/4.x-HBase-1.0
Commit: aafa07962d9f129ad0f04f147f91c454b854a3da
Parents: d830ddf
Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org>
Authored: Fri Jul 29 00:07:37 2016 +0530
Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org>
Committed: Fri Jul 29 00:07:37 2016 +0530

----------------------------------------------------------------------
 .../phoenix/mapreduce/AbstractBulkLoadTool.java | 32 ++++++--------------
 .../phoenix/mapreduce/index/IndexTool.java      | 20 ++++++++++--
 .../java/org/apache/phoenix/util/IndexUtil.java | 14 +++++++++
 3 files changed, 42 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/aafa0796/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index a41de8a..f7b7d22 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.mapreduce;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -59,6 +60,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -309,33 +311,19 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
 
         if (success) {
             if (hasLocalIndexes) {
-                byte[][] splitKeysAfterJob = null;
                 try {
                     table = new HTable(job.getConfiguration(), 
qualifiedTableName);
-                    splitKeysAfterJob = table.getStartKeys();
+                    if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob, 
table.getStartKeys())) {
+                        LOG.error("The table "
+                                + qualifiedTableName
+                                + " has local indexes and there is split key 
mismatch before and"
+                                + " after running bulkload job. Please rerun 
the job otherwise"
+                                + " there may be inconsistencies between 
actual data and index data.");
+                        return -1;
+                    }
                 } finally {
                     if (table != null) table.close();
                 }
-                boolean matchingSplitKeys = true;
-                if (splitKeysBeforeJob != null && splitKeysAfterJob != null
-                        && splitKeysBeforeJob.length == 
splitKeysAfterJob.length) {
-                    for (int i = 0; i < splitKeysBeforeJob.length; i++) {
-                        if (Bytes.compareTo(splitKeysBeforeJob[i], 
splitKeysAfterJob[i]) != 0) {
-                            matchingSplitKeys = false;
-                            break;
-                        }
-                    }
-                } else {
-                    matchingSplitKeys = false;
-                }
-                if(!matchingSplitKeys) {
-                    LOG.error("The table "
-                            + qualifiedTableName
-                            + " has local indexes and there is split key 
mismatch before and"
-                            + " after running bulkload job. Please rerun the 
job otherwise"
-                            + " there may be inconsistencies between actual 
data and index data.");
-                    return -1;
-                }
             }
             LOG.info("Loading HFiles from {}", outputPath);
             completebulkload(conf,outputPath,tablesToBeLoaded);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aafa0796/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 34c9013..8488123 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -63,6 +63,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
@@ -205,8 +206,10 @@ public class IndexTool extends Configured implements Tool {
             // check if the index type is LOCAL, if so, derive and set the 
physicalIndexName that is
             // computed from the qDataTable name.
             String physicalIndexTable = 
pindexTable.getPhysicalName().getString();
+            boolean isLocalIndexBuild = false;
             if (IndexType.LOCAL.equals(pindexTable.getIndexType())) {
                 physicalIndexTable = qDataTable;
+                isLocalIndexBuild = true;
             }
 
             final PhoenixConnection pConnection = 
connection.unwrap(PhoenixConnection.class);
@@ -247,7 +250,7 @@ public class IndexTool extends Configured implements Tool {
                 configureSubmittableJobUsingDirectApi(job, outputPath,
                     cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()));
             } else {
-                configureRunnableJobUsingBulkLoad(job, outputPath);
+                configureRunnableJobUsingBulkLoad(job, outputPath, 
isLocalIndexBuild);
                 // Without direct API, we need to update the index state to 
ACTIVE from client.
                 IndexToolUtil.updateIndexState(connection, qDataTable, 
indexTable,
                         PIndexState.ACTIVE);
@@ -276,7 +279,7 @@ public class IndexTool extends Configured implements Tool {
      * @return
      * @throws Exception
      */
-    private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath) 
throws Exception {
+    private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath, 
boolean isLocalIndexBuild) throws Exception {
         job.setMapperClass(PhoenixIndexImportMapper.class);
         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
         job.setMapOutputValueClass(KeyValue.class);
@@ -285,11 +288,24 @@ public class IndexTool extends Configured implements Tool 
{
                 PhoenixConfigurationUtil.getPhysicalTableName(configuration);
         final HTable htable = new HTable(configuration, physicalIndexTable);
         HFileOutputFormat.configureIncrementalLoad(job, htable);
+        byte[][] splitKeysBeforeJob = null;
+        if(isLocalIndexBuild) {
+            splitKeysBeforeJob = htable.getStartKeys();
+        }
         boolean status = job.waitForCompletion(true);
         if (!status) {
             LOG.error("IndexTool job failed!");
             htable.close();
             throw new Exception("IndexTool job failed: " + job.toString());
+        } else {
+            if (isLocalIndexBuild
+                    && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, 
htable.getStartKeys())) {
+                String errMsg = "The index to build is local index and the 
split keys are not matching"
+                        + " before and after running the job. Please rerun the 
job otherwise"
+                        + " there may be inconsistencies between actual data 
and index data";
+                LOG.error(errMsg);
+                throw new Exception(errMsg);
+            }
         }
 
         LOG.info("Loading HFiles from {}", outputPath);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aafa0796/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 79a5275..45d9e43 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -674,4 +674,18 @@ public class IndexUtil {
             mutations.toArray(new Mutation[mutations.size()]),
             HConstants.NO_NONCE, HConstants.NO_NONCE);
     }
+
+    public static boolean matchingSplitKeys(byte[][] splitKeys1, byte[][] 
splitKeys2) throws IOException {
+        if (splitKeys1 != null && splitKeys2 != null
+                && splitKeys1.length == splitKeys2.length) {
+            for (int i = 0; i < splitKeys1.length; i++) {
+                if (Bytes.compareTo(splitKeys1[i], splitKeys2[i]) != 0) {
+                    return false;
+                }
+            }
+        } else {
+            return false;
+        }
+        return true;
+    }
 }

Reply via email to