Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.1 e2b4e57d7 -> d3204f7d1


PHOENIX-3061 IndexTool marks index as ACTIVE and exit 0 even if bulkload has 
error (addendum) (Simon Wang)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d3204f7d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d3204f7d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d3204f7d

Branch: refs/heads/4.x-HBase-1.1
Commit: d3204f7d133ecabe84991e36803a9c3f1a2d8507
Parents: e2b4e57
Author: Simon Wang <simon.w...@airbnb.com>
Authored: Tue Jul 12 11:38:21 2016 -0700
Committer: Thomas D'Silva <tdsi...@salesforce.com>
Committed: Tue Jul 12 22:11:14 2016 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/IndexToolIT.java | 14 ++++-
 .../phoenix/mapreduce/index/IndexTool.java      | 16 +++---
 .../phoenix/mapreduce/index/IndexToolUtil.java  |  8 ++-
 .../index/PhoenixIndexImportDirectReducer.java  | 60 ++++++++++++++++++++
 .../index/PhoenixIndexImportMapper.java         |  7 ++-
 .../index/PhoenixIndexToolReducer.java          | 60 --------------------
 6 files changed, 90 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d3204f7d/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index cb013c8..c66fea3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -37,6 +37,7 @@ import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -126,11 +127,16 @@ public class IndexToolIT extends 
BaseOwnClusterHBaseManagedTimeIT {
             if (transactional) {
                 // insert two rows in another connection without committing so 
that they are not visible to other transactions
                 try (Connection conn2 = DriverManager.getConnection(getUrl(), 
props)) {
-                    PreparedStatement stmt2 = 
conn.prepareStatement(upsertQuery);
+                    conn2.setAutoCommit(false);
+                    PreparedStatement stmt2 = 
conn2.prepareStatement(upsertQuery);
                     upsertRow(stmt2, 5);
                     upsertRow(stmt2, 6);
                     ResultSet rs = conn.createStatement().executeQuery("SELECT 
count(*) from "+fullTableName);
                     assertTrue(rs.next());
+                    assertEquals("Unexpected row count ", 2, rs.getInt(1));
+                    assertFalse(rs.next());
+                    rs = conn2.createStatement().executeQuery("SELECT count(*) 
from "+fullTableName);
+                    assertTrue(rs.next());
                     assertEquals("Unexpected row count ", 4, rs.getInt(1));
                     assertFalse(rs.next());
                 }
@@ -156,8 +162,10 @@ public class IndexToolIT extends 
BaseOwnClusterHBaseManagedTimeIT {
            
             //run the index MR job.
             final IndexTool indexingTool = new IndexTool();
-            indexingTool.setConf(new 
Configuration(getUtility().getConfiguration()));
-            
+            Configuration conf = new 
Configuration(getUtility().getConfiguration());
+            conf.set(QueryServices.TRANSACTIONS_ENABLED, 
Boolean.TRUE.toString());
+            indexingTool.setConf(conf);
+
             final String[] cmdArgs = getArgValues(schemaName, dataTable, 
indxTable, directApi);
             int status = indexingTool.run(cmdArgs);
             assertEquals(0, status);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d3204f7d/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 1b1f0fb..34c9013 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -244,15 +244,13 @@ public class IndexTool extends Configured implements Tool 
{
             
             boolean useDirectApi = 
cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
             if (useDirectApi) {
-                job.setMapperClass(PhoenixIndexImportDirectMapper.class);
                 configureSubmittableJobUsingDirectApi(job, outputPath,
                     cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()));
             } else {
-                job.setMapperClass(PhoenixIndexImportMapper.class);
                 configureRunnableJobUsingBulkLoad(job, outputPath);
-                // finally update the index state to ACTIVE.
+                // Without direct API, we need to update the index state to 
ACTIVE from client.
                 IndexToolUtil.updateIndexState(connection, qDataTable, 
indexTable,
-                    PIndexState.ACTIVE);
+                        PIndexState.ACTIVE);
             }
             return 0;
         } catch (Exception ex) {
@@ -265,7 +263,7 @@ public class IndexTool extends Configured implements Tool {
                     connection.close();
                 }
             } catch (SQLException sqle) {
-                LOG.error(" Failed to close connection ", sqle.getMessage());
+                LOG.error("Failed to close connection ", sqle.getMessage());
                 throw new RuntimeException("Failed to close connection");
             }
         }
@@ -279,6 +277,7 @@ public class IndexTool extends Configured implements Tool {
      * @throws Exception
      */
     private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath) 
throws Exception {
+        job.setMapperClass(PhoenixIndexImportMapper.class);
         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
         job.setMapOutputValueClass(KeyValue.class);
         final Configuration configuration = job.getConfiguration();
@@ -288,7 +287,7 @@ public class IndexTool extends Configured implements Tool {
         HFileOutputFormat.configureIncrementalLoad(job, htable);
         boolean status = job.waitForCompletion(true);
         if (!status) {
-            LOG.error("Failed to run the IndexTool job.");
+            LOG.error("IndexTool job failed!");
             htable.close();
             throw new Exception("IndexTool job failed: " + job.toString());
         }
@@ -314,6 +313,8 @@ public class IndexTool extends Configured implements Tool {
      */
     private void configureSubmittableJobUsingDirectApi(Job job, Path 
outputPath, boolean runForeground)
             throws Exception {
+        job.setMapperClass(PhoenixIndexImportDirectMapper.class);
+        job.setReducerClass(PhoenixIndexImportDirectReducer.class);
         Configuration conf = job.getConfiguration();
         HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
         // Set the Physical Table name for use in 
DirectHTableWriter#write(Mutation)
@@ -322,7 +323,6 @@ public class IndexTool extends Configured implements Tool {
         
         //Set the Output classes
         job.setMapOutputValueClass(IntWritable.class);
-        job.setReducerClass(PhoenixIndexToolReducer.class);
         job.setOutputKeyClass(NullWritable.class);
         job.setOutputValueClass(NullWritable.class);
         TableMapReduceUtil.addDependencyJars(job);
@@ -336,7 +336,7 @@ public class IndexTool extends Configured implements Tool {
         LOG.info("Running Index Build in Foreground. Waits for the build to 
complete. This may take a long time!.");
         boolean result = job.waitForCompletion(true);
         if (!result) {
-            LOG.error("Job execution failed!");
+            LOG.error("IndexTool job failed!");
             throw new Exception("IndexTool job failed: " + job.toString());
         }
         FileSystem.get(conf).delete(outputPath, true);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d3204f7d/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java
index 96e711c..f955e6b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java
@@ -19,11 +19,15 @@ package org.apache.phoenix.mapreduce.index;
 
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +52,9 @@ public class IndexToolUtil {
        public static void updateIndexState(Configuration 
configuration,PIndexState state) throws SQLException {
                final String masterTable = 
PhoenixConfigurationUtil.getInputTableName(configuration);
                final String indexTable = 
PhoenixConfigurationUtil.getOutputTableName(configuration);
-               final Connection connection = 
ConnectionUtil.getOutputConnection(configuration);
+               final Properties overrideProps = new Properties();
+               overrideProps.setProperty(QueryServices.TRANSACTIONS_ENABLED, 
configuration.get(QueryServices.TRANSACTIONS_ENABLED));
+               final Connection connection = 
ConnectionUtil.getOutputConnection(configuration, overrideProps);
                try {
                        updateIndexState(connection, masterTable, indexTable , 
state);
                } finally {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d3204f7d/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
new file mode 100644
index 0000000..51b88c1
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.schema.PIndexState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reducer class that does only one task and that is to update the index state 
of the table.
+ */
+public class PhoenixIndexImportDirectReducer extends
+        Reducer<ImmutableBytesWritable, IntWritable, NullWritable, 
NullWritable> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PhoenixIndexImportDirectReducer.class);
+    private Configuration configuration;
+
+    /**
+     * Called once at the start of the task.
+     */
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        configuration = context.getConfiguration();
+    }
+
+    @Override
+    protected void reduce(ImmutableBytesWritable arg0, Iterable<IntWritable> 
arg1,
+            Reducer<ImmutableBytesWritable, IntWritable, NullWritable, 
NullWritable>.Context arg2)
+            throws IOException, InterruptedException {
+        try {
+            IndexToolUtil.updateIndexState(configuration, PIndexState.ACTIVE);
+        } catch (SQLException e) {
+            LOG.error(" Failed to update the status to Active");
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d3204f7d/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
index 093b93d..7551527 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
@@ -117,7 +117,7 @@ public class PhoenixIndexImportMapper extends 
Mapper<NullWritable, PhoenixIndexD
             }
             connection.rollback();
        } catch (SQLException e) {
-           LOG.error(" Error {}  while read/write of a record 
",e.getMessage());
+           LOG.error("Error {}  while read/write of a record ",e.getMessage());
            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
            throw new RuntimeException(e);
         } 
@@ -126,11 +126,12 @@ public class PhoenixIndexImportMapper extends 
Mapper<NullWritable, PhoenixIndexD
     @Override
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
          super.cleanup(context);
-         if(connection != null) {
+         if (connection != null) {
              try {
                 connection.close();
             } catch (SQLException e) {
-                LOG.error("Error {} while closing connection in the 
PhoenixIndexMapper class ",e.getMessage());
+                LOG.error("Error {} while closing connection in the 
PhoenixIndexMapper class ",
+                        e.getMessage());
             }
          }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d3204f7d/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolReducer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolReducer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolReducer.java
deleted file mode 100644
index 6df0ee1..0000000
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolReducer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.mapreduce.index;
-
-import java.io.IOException;
-import java.sql.SQLException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.phoenix.schema.PIndexState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Reducer class that does only one task and that is to update the index state 
of the table.
- */
-public class PhoenixIndexToolReducer extends
-        Reducer<ImmutableBytesWritable, IntWritable, NullWritable, 
NullWritable> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(PhoenixIndexToolReducer.class);
-    private Configuration configuration;
-
-    /**
-     * Called once at the start of the task.
-     */
-    @Override
-    protected void setup(Context context) throws IOException, 
InterruptedException {
-        configuration = context.getConfiguration();
-    }
-
-    @Override
-    protected void reduce(ImmutableBytesWritable arg0, Iterable<IntWritable> 
arg1,
-            Reducer<ImmutableBytesWritable, IntWritable, NullWritable, 
NullWritable>.Context arg2)
-            throws IOException, InterruptedException {
-        try {
-            IndexToolUtil.updateIndexState(configuration, PIndexState.ACTIVE);
-        } catch (SQLException e) {
-            LOG.error(" Failed to update the status to Active");
-            throw new RuntimeException(e.getMessage());
-        }
-    }
-}

Reply via email to