This is an automated email from the ASF dual-hosted git repository.

tdsilva pushed a commit to branch 4.14-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 9e10faae49326fd54561f4b6972d67476bd85963
Author: Chinmay Kulkarni <chinmayskulka...@gmail.com>
AuthorDate: Thu Mar 14 23:16:14 2019 -0700

    PHOENIX-5184: HBase and Phoenix connection leaks in Indexing code path, 
OrphanViewTool and PhoenixConfigurationUtil
---
 .../UngroupedAggregateRegionObserver.java          |  6 ++-
 .../hbase/index/write/RecoveryIndexWriter.java     | 10 ++--
 .../phoenix/mapreduce/AbstractBulkLoadTool.java    | 15 ++----
 .../apache/phoenix/mapreduce/OrphanViewTool.java   | 53 ++++++++++-----------
 .../phoenix/mapreduce/PhoenixRecordWriter.java     | 18 +++++--
 .../mapreduce/index/DirectHTableWriter.java        | 14 +++++-
 .../mapreduce/index/IndexScrutinyMapper.java       | 24 ++++++++--
 .../apache/phoenix/mapreduce/index/IndexTool.java  | 55 ++++++++++++++++------
 .../index/PhoenixIndexImportDirectMapper.java      | 26 +++++-----
 .../mapreduce/index/PhoenixIndexImportMapper.java  | 16 ++++---
 .../index/PhoenixIndexPartialBuildMapper.java      | 25 ++++++----
 .../mapreduce/util/PhoenixConfigurationUtil.java   | 45 +++++++++---------
 .../apache/phoenix/parse/DropTableStatement.java   |  4 +-
 13 files changed, 190 insertions(+), 121 deletions(-)

diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 2eb15a1..f0ce5b2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -817,7 +817,11 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             }
             try {
                 if (targetHTable != null) {
-                    targetHTable.close();
+                    try {
+                        targetHTable.close();
+                    } catch (IOException e) {
+                        logger.error("Closing table: " + targetHTable + " 
failed: ", e);
+                    }
                 }
             } finally {
                 try {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
index 35f0a6d..fb96666 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
@@ -26,8 +26,6 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -55,15 +53,13 @@ public class RecoveryIndexWriter extends IndexWriter {
      * Directly specify the {@link IndexCommitter} and {@link 
IndexFailurePolicy}. Both are expected to be fully setup
      * before calling.
      * 
-     * @param committer
      * @param policy
      * @param env
+     * @param name
      * @throws IOException
-     * @throws ZooKeeperConnectionException
-     * @throws MasterNotRunningException
      */
     public RecoveryIndexWriter(IndexFailurePolicy policy, 
RegionCoprocessorEnvironment env, String name)
-            throws MasterNotRunningException, ZooKeeperConnectionException, 
IOException {
+            throws IOException {
         super(new TrackingParallelWriterIndexCommitter(), policy, env, name);
         this.admin = new HBaseAdmin(env.getConfiguration());
     }
@@ -125,7 +121,7 @@ public class RecoveryIndexWriter extends IndexWriter {
             try {
                 admin.close();
             } catch (IOException e) {
-                // closing silently
+                LOG.error("Closing the admin failed: ", e);
             }
         }
     }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index f717647..4561152 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -37,20 +36,17 @@ import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
-import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixDriver;
@@ -62,7 +58,6 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -70,7 +65,6 @@ import org.apache.phoenix.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
@@ -350,10 +344,11 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
             LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
             String tableName = table.getPhysicalName();
             Path tableOutputPath = CsvBulkImportUtil.getOutputPath(outputPath, 
tableName);
-            HTable htable = new HTable(conf,tableName);
-            LOG.info("Loading HFiles for {} from {}", tableName , 
tableOutputPath);
-            loader.doBulkLoad(tableOutputPath, htable);
-            LOG.info("Incremental load complete for table=" + tableName);
+            try(HTable htable = new HTable(conf,tableName)) {
+                LOG.info("Loading HFiles for {} from {}", tableName , 
tableOutputPath);
+                loader.doBulkLoad(tableOutputPath, htable);
+                LOG.info("Incremental load complete for table=" + tableName);
+            }
         }
     }
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
index 2e0dd0d..f5ea35a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
@@ -408,14 +408,16 @@ public class OrphanViewTool extends Configured implements 
Tool {
             } else {
                 tenantConnection = phoenixConnection;
             }
-            String fullViewName = SchemaUtil.getTableName(key.getSchemaName(), 
key.getTableName());
-            String dropTable = String.format("DROP VIEW IF EXISTS %s CASCADE", 
fullViewName);
+
+            MetaDataClient client = new MetaDataClient(tenantConnection);
+            org.apache.phoenix.parse.TableName pTableName = 
org.apache.phoenix.parse.TableName
+                    .create(key.getSchemaName(), key.getTableName());
             try {
-                tenantConnection.createStatement().execute(dropTable);
-                tenantConnection.commit();
+                client.dropTable(
+                        new DropTableStatement(pTableName, PTableType.VIEW, 
false, true));
             }
             catch (TableNotFoundException e) {
-                LOG.info("Ignoring view " + fullViewName + " as it has already 
been dropped");
+                LOG.info("Ignoring view " + pTableName + " as it has already 
been dropped");
             }
         } finally {
             if (newConn) {
@@ -424,22 +426,6 @@ public class OrphanViewTool extends Configured implements 
Tool {
         }
     }
 
-    /**
-     * Try closing a connection if it is not null
-     * @param connection connection object
-     * @throws RuntimeException if closing the connection fails
-     */
-    private void tryClosingConnection(Connection connection) {
-        try {
-            if (connection != null) {
-                connection.close();
-            }
-        } catch (SQLException sqlE) {
-            LOG.error("Failed to close connection: ", sqlE);
-            throw new RuntimeException("Failed to close connection with 
exception: ", sqlE);
-        }
-    }
-
     private void removeLink(PhoenixConnection phoenixConnection, Key src, Key 
dst, PTable.LinkType linkType) throws Exception {
         String deleteQuery = "DELETE FROM " +
                 ((linkType == PTable.LinkType.PHYSICAL_TABLE || linkType == 
PTable.LinkType.PARENT_TABLE) ? SYSTEM_CATALOG_NAME : SYSTEM_CHILD_LINK_NAME) +
@@ -798,14 +784,7 @@ public class OrphanViewTool extends Configured implements 
Tool {
     }
 
     private void closeConnectionAndFiles(Connection connection) throws 
IOException {
-        try {
-            if (connection != null) {
-                connection.close();
-            }
-        } catch (SQLException sqle) {
-            LOG.error("Failed to close connection ", sqle.getMessage());
-            throw new RuntimeException("Failed to close connection");
-        }
+        tryClosingConnection(connection);
         for (byte i = VIEW; i < ORPHAN_TYPE_COUNT; i++) {
             if (writer[i] != null) {
                 writer[i].close();
@@ -817,6 +796,22 @@ public class OrphanViewTool extends Configured implements 
Tool {
     }
 
     /**
+     * Try closing a connection if it is not null
+     * @param connection connection object
+     * @throws RuntimeException if closing the connection fails
+     */
+    private void tryClosingConnection(Connection connection) {
+        try {
+            if (connection != null) {
+                connection.close();
+            }
+        } catch (SQLException sqlE) {
+            LOG.error("Failed to close connection: ", sqlE);
+            throw new RuntimeException("Failed to close connection with 
exception: ", sqlE);
+        }
+    }
+
+    /**
      * Examples for input arguments:
      * -c : cleans orphan views
      * -c -op /tmp/ : cleans orphan views and links, and logs their names to 
the files named Orphan*.txt in /tmp/
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
index 52f2fe3..b67ba74 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -52,10 +52,20 @@ public class PhoenixRecordWriter<T extends DBWritable>  
extends RecordWriter<Nul
     }
     
     public PhoenixRecordWriter(final Configuration configuration, Set<String> 
propsToIgnore) throws SQLException {
-        this.conn = 
ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, 
propsToIgnore);
-        this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
-        final String upsertQuery = 
PhoenixConfigurationUtil.getUpsertStatement(configuration);
-        this.statement = this.conn.prepareStatement(upsertQuery);
+        Connection connection = null;
+        try {
+            connection = 
ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, 
propsToIgnore);
+            this.batchSize = 
PhoenixConfigurationUtil.getBatchSize(configuration);
+            final String upsertQuery = 
PhoenixConfigurationUtil.getUpsertStatement(configuration);
+            this.statement = connection.prepareStatement(upsertQuery);
+            this.conn = connection;
+        } catch (Exception e) {
+            // Only close the connection in case of an exception, so cannot 
use try-with-resources
+            if (connection != null) {
+                connection.close();
+            }
+            throw e;
+        }
     }
 
     @Override
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
index 59b26b2..32d2f3b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.mapreduce.index;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 
@@ -56,6 +57,7 @@ public class DirectHTableWriter {
             LOG.info("Created table instance for " + tableName);
         } catch (IOException e) {
             LOG.error("IOException : ", e);
+            tryClosingResourceSilently(this.table);
             throw new RuntimeException(e);
         }
     }
@@ -73,7 +75,17 @@ public class DirectHTableWriter {
         return table;
     }
 
+    private void tryClosingResourceSilently(Closeable res) {
+        if (res != null) {
+            try {
+                res.close();
+            } catch (IOException e) {
+                LOG.error("Closing resource: " + res + " failed with error: ", 
e);
+            }
+        }
+    }
+
     public void close() throws IOException {
-        table.close();
+        tryClosingResourceSilently(this.table);
     }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index 81081bf..c651077 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -149,10 +149,23 @@ public class IndexScrutinyMapper extends 
Mapper<NullWritable, PhoenixIndexDBWrit
             LOG.info("Target table base query: " + targetTableQuery);
             md5 = MessageDigest.getInstance("MD5");
         } catch (SQLException | NoSuchAlgorithmException e) {
+            tryClosingResourceSilently(this.outputUpsertStmt);
+            tryClosingResourceSilently(this.connection);
+            tryClosingResourceSilently(this.outputConn);
             throw new RuntimeException(e);
         }
     }
 
+    private static void tryClosingResourceSilently(AutoCloseable res) {
+        if (res != null) {
+            try {
+                res.close();
+            } catch (Exception e) {
+                LOG.error("Closing resource: " + res + " failed :", e);
+            }
+        }
+    }
+
     @Override
     protected void map(NullWritable key, PhoenixIndexDBWritable record, 
Context context)
             throws IOException, InterruptedException {
@@ -180,18 +193,21 @@ public class IndexScrutinyMapper extends 
Mapper<NullWritable, PhoenixIndexDBWrit
     @Override
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
         super.cleanup(context);
+        tryClosingResourceSilently(this.outputUpsertStmt);
+        IOException throwException = null;
         if (connection != null) {
             try {
                 processBatch(context);
                 connection.close();
-                if (outputConn != null) {
-                    outputConn.close();
-                }
             } catch (SQLException e) {
                 LOG.error("Error while closing connection in the 
PhoenixIndexMapper class ", e);
-                throw new IOException(e);
+                throwException = new IOException(e);
             }
         }
+        tryClosingResourceSilently(this.outputConn);
+        if (throwException != null) {
+            throw throwException;
+        }
     }
 
     private void processBatch(Context context)
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index a665a91..c91f53a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -267,6 +267,12 @@ public class IndexTool extends Configured implements Tool {
 
         }
 
+        void closeConnection() throws SQLException {
+            if (this.connection != null) {
+                this.connection.close();
+            }
+        }
+
         public Job getJob() throws Exception {
             if (isPartialBuild) {
                 return configureJobForPartialBuild();
@@ -514,10 +520,10 @@ public class IndexTool extends Configured implements Tool 
{
             final Configuration configuration = job.getConfiguration();
             final String physicalIndexTable =
                     
PhoenixConfigurationUtil.getPhysicalTableName(configuration);
-            final HTable htable = new HTable(configuration, 
physicalIndexTable);
-            HFileOutputFormat.configureIncrementalLoad(job, htable);
+            try(final HTable htable = new HTable(configuration, 
physicalIndexTable)) {
+                HFileOutputFormat.configureIncrementalLoad(job, htable);
+            }
             return job;
-               
         }
         
         /**
@@ -525,9 +531,6 @@ public class IndexTool extends Configured implements Tool {
          * waits for the job completion based on runForeground parameter.
          * 
          * @param job
-         * @param outputPath
-         * @param runForeground - if true, waits for job completion, else 
submits and returns
-         *            immediately.
          * @return
          * @throws Exception
          */
@@ -560,6 +563,7 @@ public class IndexTool extends Configured implements Tool {
     public int run(String[] args) throws Exception {
         Connection connection = null;
         HTable htable = null;
+        JobFactory jobFactory = null;
         try {
             CommandLine cmdLine = null;
             try {
@@ -574,13 +578,14 @@ public class IndexTool extends Configured implements Tool 
{
                 tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
                 configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
             }
-            connection = ConnectionUtil.getInputConnection(configuration);
             schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
             dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
             indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
             isPartialBuild = 
cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
             qDataTable = SchemaUtil.getQualifiedTableName(schemaName, 
dataTable);
-            pDataTable = PhoenixRuntime.getTableNoCache(connection, 
qDataTable);
+            try(Connection tempConn = 
ConnectionUtil.getInputConnection(configuration)) {
+                pDataTable = PhoenixRuntime.getTableNoCache(tempConn, 
qDataTable);
+            }
             useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
             String 
basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
             boolean isForeground = 
cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
@@ -607,7 +612,6 @@ public class IndexTool extends Configured implements Tool {
                 }
                 htable = 
(HTable)connection.unwrap(PhoenixConnection.class).getQueryServices()
                         .getTable(pIndexTable.getPhysicalName().getBytes());
-
                 if (IndexType.LOCAL.equals(pIndexTable.getIndexType())) {
                     isLocalIndexBuild = true;
                     splitKeysBeforeJob = 
htable.getRegionLocator().getStartKeys();
@@ -635,7 +639,8 @@ public class IndexTool extends Configured implements Tool {
                                fs.delete(outputPath, true);
                        }
 
-                       job = new JobFactory(connection, configuration, 
outputPath).getJob();
+            jobFactory = new JobFactory(connection, configuration, outputPath);
+            job = jobFactory.getJob();
 
             if (!isForeground && useDirectApi) {
                 LOG.info("Running Index Build in Background - Submit async and 
exit");
@@ -668,16 +673,36 @@ public class IndexTool extends Configured implements Tool 
{
                     + ExceptionUtils.getMessage(ex) + " at:\n" + 
ExceptionUtils.getStackTrace(ex));
             return -1;
         } finally {
+            boolean rethrowException = false;
             try {
                 if (connection != null) {
-                    connection.close();
+                    try {
+                        connection.close();
+                    } catch (SQLException e) {
+                        LOG.error("Failed to close connection ", e);
+                        rethrowException = true;
+                    }
                 }
                 if (htable != null) {
-                    htable.close();
+                    try {
+                        htable.close();
+                    } catch (IOException e) {
+                        LOG.error("Failed to close htable ", e);
+                        rethrowException = true;
+                    }
+                }
+                if (jobFactory != null) {
+                    try {
+                        jobFactory.closeConnection();
+                    } catch (SQLException e) {
+                        LOG.error("Failed to close jobFactory ", e);
+                        rethrowException = true;
+                    }
+                }
+            } finally {
+                if (rethrowException) {
+                    throw new RuntimeException("Failed to close resource");
                 }
-            } catch (SQLException sqle) {
-                LOG.error("Failed to close connection ", sqle.getMessage());
-                throw new RuntimeException("Failed to close connection");
             }
         }
     }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index eb4bc0e..7328014 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -103,7 +103,8 @@ public class PhoenixIndexImportDirectMapper extends
             final String upsertQuery = 
PhoenixConfigurationUtil.getUpsertStatement(configuration);
             this.pStatement = connection.prepareStatement(upsertQuery);
 
-        } catch (SQLException e) {
+        } catch (Exception e) {
+            tryClosingResources();
             throw new RuntimeException(e);
         }
     }
@@ -176,17 +177,20 @@ public class PhoenixIndexImportDirectMapper extends
             context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
             throw new RuntimeException(e);
         } finally {
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (SQLException e) {
-                    LOG.error("Error {} while closing connection in the 
PhoenixIndexMapper class ",
-                        e.getMessage());
-                }
-            }
-            if (writer != null) {
-                writer.close();
+            tryClosingResources();
+        }
+    }
+
+    private void tryClosingResources() throws IOException {
+        if (this.connection != null) {
+            try {
+                this.connection.close();
+            } catch (SQLException e) {
+                LOG.error("Error while closing connection in the 
PhoenixIndexMapper class ", e);
             }
         }
+        if (this.writer != null) {
+            this.writer.close();
+        }
     }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
index 9e0d629..e060bc3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
@@ -83,6 +83,7 @@ public class PhoenixIndexImportMapper extends 
Mapper<NullWritable, PhoenixIndexD
             this.pStatement = connection.prepareStatement(upsertQuery);
             
         } catch (SQLException e) {
+            tryClosingConnection();
             throw new RuntimeException(e.getMessage());
         } 
     }
@@ -125,14 +126,17 @@ public class PhoenixIndexImportMapper extends 
Mapper<NullWritable, PhoenixIndexD
 
     @Override
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
-         super.cleanup(context);
-         if (connection != null) {
-             try {
+        super.cleanup(context);
+        tryClosingConnection();
+    }
+
+    private void tryClosingConnection() {
+        if (connection != null) {
+            try {
                 connection.close();
             } catch (SQLException e) {
-                LOG.error("Error {} while closing connection in the 
PhoenixIndexMapper class ",
-                        e.getMessage());
+                LOG.error("Error while closing connection in the 
PhoenixIndexMapper class ", e);
             }
-         }
+        }
     }
 }
\ No newline at end of file
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
index f4ecac2..2077137 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
@@ -96,6 +96,7 @@ public class PhoenixIndexPartialBuildMapper extends 
TableMapper<ImmutableBytesWr
             this.mutations = Lists.newArrayListWithExpectedSize(batchSize);
             maintainers=new 
ImmutableBytesPtr(PhoenixConfigurationUtil.getIndexMaintainers(configuration));
         } catch (SQLException e) {
+            tryClosingResources();
             throw new RuntimeException(e.getMessage());
         } 
     }
@@ -170,17 +171,21 @@ public class PhoenixIndexPartialBuildMapper extends 
TableMapper<ImmutableBytesWr
             context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
             throw new RuntimeException(e);
         } finally {
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (SQLException e) {
-                    LOG.error("Error {} while closing connection in the 
PhoenixIndexMapper class ",
-                        e.getMessage());
-                }
-            }
-            if (writer != null) {
-                writer.close();
+            tryClosingResources();
+        }
+    }
+
+    private void tryClosingResources() throws IOException {
+        if (this.connection != null) {
+            try {
+                this.connection.close();
+            } catch (SQLException e) {
+                LOG.error("Error while closing connection in the 
PhoenixIndexMapper class ", e);
             }
         }
+        if (this.writer != null) {
+            this.writer.close();
+        }
     }
+
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 6788e5f..0ff7904 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -303,18 +303,20 @@ public final class PhoenixConfigurationUtil {
         }
         final String tableName = getOutputTableName(configuration);
         Preconditions.checkNotNull(tableName);
-        final Connection connection = 
ConnectionUtil.getOutputConnection(configuration);
-        List<String> upsertColumnList = 
PhoenixConfigurationUtil.getUpsertColumnNames(configuration);
-        if(!upsertColumnList.isEmpty()) {
-            LOG.info(String.format("UseUpsertColumns=%s, 
upsertColumnList.size()=%s, upsertColumnList=%s "
-                    ,!upsertColumnList.isEmpty(), upsertColumnList.size(), 
Joiner.on(",").join(upsertColumnList)
-                    ));
-        } 
-       columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, 
tableName, upsertColumnList);
-       // we put the encoded column infos in the Configuration for re 
usability.
-       ColumnInfoToStringEncoderDecoder.encode(configuration, 
columnMetadataList); 
-       connection.close();
-       return columnMetadataList;
+        try (final Connection connection = 
ConnectionUtil.getOutputConnection(configuration)) {
+            List<String> upsertColumnList =
+                    
PhoenixConfigurationUtil.getUpsertColumnNames(configuration);
+            if(!upsertColumnList.isEmpty()) {
+                LOG.info(String.format("UseUpsertColumns=%s, 
upsertColumnList.size()=%s,"
+                                + " upsertColumnList=%s 
",!upsertColumnList.isEmpty(),
+                        upsertColumnList.size(), 
Joiner.on(",").join(upsertColumnList)));
+            }
+            columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, 
tableName,
+                    upsertColumnList);
+            // we put the encoded column infos in the Configuration for re 
usability.
+            ColumnInfoToStringEncoderDecoder.encode(configuration, 
columnMetadataList);
+        }
+               return columnMetadataList;
     }
     
      public static String getUpsertStatement(final Configuration 
configuration) throws SQLException {
@@ -361,12 +363,13 @@ public final class PhoenixConfigurationUtil {
         if (tenantId != null) {
             props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         }
-        final Connection connection = 
ConnectionUtil.getInputConnection(configuration, props);
-        final List<String> selectColumnList = 
getSelectColumnList(configuration);
-        columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, 
tableName, selectColumnList);
-        // we put the encoded column infos in the Configuration for re 
usability.
-        ColumnInfoToStringEncoderDecoder.encode(configuration, 
columnMetadataList);
-        connection.close();
+        try (final Connection connection = 
ConnectionUtil.getInputConnection(configuration, props)){
+            final List<String> selectColumnList = 
getSelectColumnList(configuration);
+            columnMetadataList =
+                    PhoenixRuntime.generateColumnInfo(connection, tableName, 
selectColumnList);
+            // we put the encoded column infos in the Configuration for re 
usability.
+            ColumnInfoToStringEncoderDecoder.encode(configuration, 
columnMetadataList);
+        }
         return columnMetadataList;
     }
 
@@ -401,9 +404,9 @@ public final class PhoenixConfigurationUtil {
         Preconditions.checkNotNull(configuration);
         long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, 
DEFAULT_UPSERT_BATCH_SIZE);
         if(batchSize <= 0) {
-           Connection conn = ConnectionUtil.getOutputConnection(configuration);
-           batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
-           conn.close();
+           try (Connection conn = 
ConnectionUtil.getOutputConnection(configuration)) {
+               batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
+           }
         }
         configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
         return batchSize;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java
index 997b695..7cadb19 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java
@@ -25,9 +25,9 @@ public class DropTableStatement extends MutableStatement {
     private final boolean ifExists;
     private final PTableType tableType;
     private final boolean cascade;
-    
 
-    protected DropTableStatement(TableName tableName, PTableType tableType, 
boolean ifExists, boolean cascade) {
+
+    public DropTableStatement(TableName tableName, PTableType tableType, 
boolean ifExists, boolean cascade) {
         this.tableName = tableName;
         this.tableType = tableType;
         this.ifExists = ifExists;

Reply via email to