This is an automated email from the ASF dual-hosted git repository. pboado pushed a commit to branch 5.x-cdh6 in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit 02ec0400a9d7c9fb1bff5fff9d339694b1786195 Author: Chinmay Kulkarni <chinmayskulka...@gmail.com> AuthorDate: Fri Mar 8 23:31:17 2019 +0000 PHOENIX-5184 HBase and Phoenix connection leaks in Indexing code path, OrphanViewTool and PhoenixConfigurationUtil --- .../UngroupedAggregateRegionObserver.java | 22 +++- .../hbase/index/write/RecoveryIndexWriter.java | 30 ++++-- .../phoenix/mapreduce/AbstractBulkLoadTool.java | 114 +++++++++++---------- .../apache/phoenix/mapreduce/OrphanViewTool.java | 73 ++++++++----- .../phoenix/mapreduce/PhoenixRecordWriter.java | 18 +++- .../mapreduce/index/DirectHTableWriter.java | 19 +++- .../mapreduce/index/IndexScrutinyMapper.java | 25 ++++- .../apache/phoenix/mapreduce/index/IndexTool.java | 85 +++++++++++---- .../index/PhoenixIndexImportDirectMapper.java | 26 +++-- .../mapreduce/index/PhoenixIndexImportMapper.java | 16 +-- .../index/PhoenixIndexPartialBuildMapper.java | 25 +++-- .../mapreduce/util/PhoenixConfigurationUtil.java | 45 ++++---- 12 files changed, 325 insertions(+), 173 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 3be4d36..6b27a88 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -29,6 +29,7 @@ import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CON import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -475,13 +477,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver byte[] deleteCF = null; byte[] emptyCF = null; Table targetHTable = null; + Connection targetHConn = null; boolean isPKChanging = false; ImmutableBytesWritable ptr = new ImmutableBytesWritable(); if (upsertSelectTable != null) { isUpsert = true; projectedTable = deserializeTable(upsertSelectTable); - targetHTable = - ConnectionFactory.createConnection(upsertSelectConfig).getTable( + targetHConn = ConnectionFactory.createConnection(upsertSelectConfig); + targetHTable = targetHConn.getTable( TableName.valueOf(projectedTable.getPhysicalName().getBytes())); selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS)); values = new byte[projectedTable.getPKColumns().size()][]; @@ -852,9 +855,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } try { - if (targetHTable != null) { - targetHTable.close(); - } + tryClosingResourceSilently(targetHTable); + tryClosingResourceSilently(targetHConn); } finally { try { innerScanner.close(); @@ -900,6 +902,16 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } + private static void tryClosingResourceSilently(Closeable res) { + if (res != null) { + try { + res.close(); + } catch (IOException e) { + logger.error("Closing resource: " + res + " failed: ", e); + } + } + } + private void checkForLocalIndexColumnFamilies(Region region, List<IndexMaintainer> indexMaintainers) throws IOException { TableDescriptor tableDesc = region.getTableDescriptor(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java index a1a917c..fefb812 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java @@ -26,10 +26,9 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -57,17 +56,25 @@ public class RecoveryIndexWriter extends IndexWriter { * Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected to be fully setup * before calling. * - * @param committer * @param policy * @param env + * @param name * @throws IOException - * @throws ZooKeeperConnectionException - * @throws MasterNotRunningException */ public RecoveryIndexWriter(IndexFailurePolicy policy, RegionCoprocessorEnvironment env, String name) - throws MasterNotRunningException, ZooKeeperConnectionException, IOException { + throws IOException { super(new TrackingParallelWriterIndexCommitter(), policy, env, name); - this.admin = ConnectionFactory.createConnection(env.getConfiguration()).getAdmin(); + Connection hConn = null; + try { + hConn = ConnectionFactory.createConnection(env.getConfiguration()); + this.admin = hConn.getAdmin(); + } catch (Exception e) { + // Close the connection only if an exception occurs + if (hConn != null) { + hConn.close(); + } + throw e; + } } @Override @@ -124,10 +131,17 @@ public class RecoveryIndexWriter extends IndexWriter { public void stop(String why) { super.stop(why); if (admin != null) { + if (admin.getConnection() != null) { + try { + admin.getConnection().close(); + } catch (IOException e) { + LOG.error("Closing the connection failed: ", e); + } + } try { admin.close(); } catch (IOException e) { - // closing silently + LOG.error("Closing the admin failed: ", e); } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index e321361..cc6feb3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -291,57 +291,64 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { job.setOutputValueClass(KeyValue.class); job.setReducerClass(FormatToKeyValueReducer.class); byte[][] splitKeysBeforeJob = null; - org.apache.hadoop.hbase.client.Connection hbaseConn = - ConnectionFactory.createConnection(job.getConfiguration()); - RegionLocator regionLocator = null; - if(hasLocalIndexes) { - try{ - regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(qualifiedTableName)); - splitKeysBeforeJob = regionLocator.getStartKeys(); - } finally { - if(regionLocator != null )regionLocator.close(); - } - } - MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded); - - final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded); - final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAL_NAMES_TO_JSON.apply(tablesToBeLoaded); - - job.getConfiguration().set(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson); - job.getConfiguration().set(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson); - - // give subclasses their hook - setupJob(job); - - LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath); - boolean success = job.waitForCompletion(true); - - if (success) { - if (hasLocalIndexes) { - try { - regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(qualifiedTableName)); - if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob, regionLocator.getStartKeys())) { - LOG.error("The table " - + qualifiedTableName - + " has local indexes and there is split key mismatch before and" - + " after running bulkload job. Please rerun the job otherwise" - + " there may be inconsistencies between actual data and index data."); - return -1; - } + try(org.apache.hadoop.hbase.client.Connection hbaseConn = + ConnectionFactory.createConnection(job.getConfiguration())) { + RegionLocator regionLocator = null; + if(hasLocalIndexes) { + try{ + regionLocator = hbaseConn.getRegionLocator( + TableName.valueOf(qualifiedTableName)); + splitKeysBeforeJob = regionLocator.getStartKeys(); } finally { if (regionLocator != null) regionLocator.close(); } } - LOG.info("Loading HFiles from {}", outputPath); - completebulkload(conf,outputPath,tablesToBeLoaded); - LOG.info("Removing output directory {}", outputPath); - if(!outputPath.getFileSystem(conf).delete(outputPath, true)) { - LOG.error("Failed to delete the output directory {}", outputPath); - } - return 0; - } else { - return -1; - } + MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded); + + final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON + .apply(tablesToBeLoaded); + final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAL_NAMES_TO_JSON + .apply(tablesToBeLoaded); + + job.getConfiguration().set(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY, + tableNamesAsJson); + job.getConfiguration().set(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY, + logicalNamesAsJson); + + // give subclasses their hook + setupJob(job); + + LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath); + boolean success = job.waitForCompletion(true); + + if (success) { + if (hasLocalIndexes) { + try { + regionLocator = hbaseConn.getRegionLocator( + TableName.valueOf(qualifiedTableName)); + if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob, + regionLocator.getStartKeys())) { + LOG.error("The table " + qualifiedTableName + " has local indexes and" + + " there is split key mismatch before and after running" + + " bulkload job. Please rerun the job otherwise there may be" + + " inconsistencies between actual data and index data."); + return -1; + } + } finally { + if (regionLocator != null) regionLocator.close(); + } + } + LOG.info("Loading HFiles from {}", outputPath); + completebulkload(conf,outputPath,tablesToBeLoaded); + LOG.info("Removing output directory {}", outputPath); + if(!outputPath.getFileSystem(conf).delete(outputPath, true)) { + LOG.error("Failed to delete the output directory {}", outputPath); + } + return 0; + } else { + return -1; + } + } } private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception { @@ -354,11 +361,14 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); String tableName = table.getPhysicalName(); Path tableOutputPath = CsvBulkImportUtil.getOutputPath(outputPath, tableName); - org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(conf); - Table htable = hbaseConn.getTable(TableName.valueOf(tableName)); - LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath); - loader.doBulkLoad(tableOutputPath, hbaseConn.getAdmin(), htable, hbaseConn.getRegionLocator(TableName.valueOf(tableName))); - LOG.info("Incremental load complete for table=" + tableName); + try(org.apache.hadoop.hbase.client.Connection hbaseConn = + ConnectionFactory.createConnection(conf); + Table htable = hbaseConn.getTable(TableName.valueOf(tableName))) { + LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath); + loader.doBulkLoad(tableOutputPath, hbaseConn.getAdmin(), htable, + hbaseConn.getRegionLocator(TableName.valueOf(tableName))); + LOG.info("Incremental load complete for table=" + tableName); + } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java index 713fb05..fba01a3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java @@ -393,27 +393,35 @@ public class OrphanViewTool extends Configured implements Tool { } } - private void gracefullyDropView(PhoenixConnection phoenixConnection, Configuration configuration, - Key key) throws Exception { - PhoenixConnection tenantConnection; - if (key.getTenantId() != null) { - Properties tenantProps = new Properties(); - tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, key.getTenantId()); - tenantConnection = ConnectionUtil.getInputConnection(configuration, tenantProps). - unwrap(PhoenixConnection.class); - } else { - tenantConnection = phoenixConnection; - } - - MetaDataClient client = new MetaDataClient(tenantConnection); - org.apache.phoenix.parse.TableName pTableName = org.apache.phoenix.parse.TableName - .create(key.getSchemaName(), key.getTableName()); + private void gracefullyDropView(PhoenixConnection phoenixConnection, + Configuration configuration, Key key) throws Exception { + PhoenixConnection tenantConnection = null; + boolean newConn = false; try { - client.dropTable( - new DropTableStatement(pTableName, PTableType.VIEW, false, true, true)); - } - catch (TableNotFoundException e) { - LOG.info("Ignoring view " + pTableName + " as it has already been dropped"); + if (key.getTenantId() != null) { + Properties tenantProps = new Properties(); + tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, key.getTenantId()); + tenantConnection = ConnectionUtil.getInputConnection(configuration, tenantProps). + unwrap(PhoenixConnection.class); + newConn = true; + } else { + tenantConnection = phoenixConnection; + } + + MetaDataClient client = new MetaDataClient(tenantConnection); + org.apache.phoenix.parse.TableName pTableName = org.apache.phoenix.parse.TableName + .create(key.getSchemaName(), key.getTableName()); + try { + client.dropTable( + new DropTableStatement(pTableName, PTableType.VIEW, false, true, true)); + } + catch (TableNotFoundException e) { + LOG.info("Ignoring view " + pTableName + " as it has already been dropped"); + } + } finally { + if (newConn) { + tryClosingConnection(tenantConnection); + } } } @@ -775,14 +783,7 @@ public class OrphanViewTool extends Configured implements Tool { } private void closeConnectionAndFiles(Connection connection) throws IOException { - try { - if (connection != null) { - connection.close(); - } - } catch (SQLException sqle) { - LOG.error("Failed to close connection ", sqle.getMessage()); - throw new RuntimeException("Failed to close connection"); - } + tryClosingConnection(connection); for (byte i = VIEW; i < ORPHAN_TYPE_COUNT; i++) { if (writer[i] != null) { writer[i].close(); @@ -794,6 +795,22 @@ public class OrphanViewTool extends Configured implements Tool { } /** + * Try closing a connection if it is not null + * @param connection connection object + * @throws RuntimeException if closing the connection fails + */ + private void tryClosingConnection(Connection connection) { + try { + if (connection != null) { + connection.close(); + } + } catch (SQLException sqlE) { + LOG.error("Failed to close connection: ", sqlE); + throw new RuntimeException("Failed to close connection with exception: ", sqlE); + } + } + + /** * Examples for input arguments: * -c : cleans orphan views * -c -op /tmp/ : cleans orphan views and links, and logs their names to the files named Orphan*.txt in /tmp/ diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java index 52f2fe3..b67ba74 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java @@ -52,10 +52,20 @@ public class PhoenixRecordWriter<T extends DBWritable> extends RecordWriter<Nul } public PhoenixRecordWriter(final Configuration configuration, Set<String> propsToIgnore) throws SQLException { - this.conn = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore); - this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration); - final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); - this.statement = this.conn.prepareStatement(upsertQuery); + Connection connection = null; + try { + connection = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore); + this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration); + final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); + this.statement = connection.prepareStatement(upsertQuery); + this.conn = connection; + } catch (Exception e) { + // Only close the connection in case of an exception, so cannot use try-with-resources + if (connection != null) { + connection.close(); + } + throw e; + } } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java index 6100b20..b85a049 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.mapreduce.index; +import java.io.Closeable; import java.io.IOException; import java.util.List; @@ -44,8 +45,8 @@ public class DirectHTableWriter { private static final Logger LOG = LoggerFactory.getLogger(DirectHTableWriter.class); private Configuration conf = null; - private Table table; + private Connection conn; public DirectHTableWriter(Configuration otherConf) { setConf(otherConf); @@ -60,11 +61,12 @@ public class DirectHTableWriter { } try { - Connection conn = ConnectionFactory.createConnection(this.conf); + this.conn = ConnectionFactory.createConnection(this.conf); this.table = conn.getTable(TableName.valueOf(tableName)); LOG.info("Created table instance for " + tableName); } catch (IOException e) { LOG.error("IOException : ", e); + tryClosingResourceSilently(this.conn); throw new RuntimeException(e); } } @@ -99,7 +101,18 @@ public class DirectHTableWriter { return table; } + private void tryClosingResourceSilently(Closeable res) { + if (res != null) { + try { + res.close(); + } catch (IOException e) { + LOG.error("Closing resource: " + res + " failed with error: ", e); + } + } + } + public void close() throws IOException { - table.close(); + tryClosingResourceSilently(this.table); + tryClosingResourceSilently(this.conn); } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java index c424787..98d6bac 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java @@ -31,7 +31,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import com.google.common.base.Strings; import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.Pair; @@ -151,10 +150,23 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit LOG.info("Target table base query: " + targetTableQuery); md5 = MessageDigest.getInstance("MD5"); } catch (SQLException | NoSuchAlgorithmException e) { + tryClosingResourceSilently(this.outputUpsertStmt); + tryClosingResourceSilently(this.connection); + tryClosingResourceSilently(this.outputConn); throw new RuntimeException(e); } } + private static void tryClosingResourceSilently(AutoCloseable res) { + if (res != null) { + try { + res.close(); + } catch (Exception e) { + LOG.error("Closing resource: " + res + " failed :", e); + } + } + } + @Override protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context) throws IOException, InterruptedException { @@ -182,18 +194,21 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); + tryClosingResourceSilently(this.outputUpsertStmt); + IOException throwException = null; if (connection != null) { try { processBatch(context); connection.close(); - if (outputConn != null) { - outputConn.close(); - } } catch (SQLException e) { LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e); - throw new IOException(e); + throwException = new IOException(e); } } + tryClosingResourceSilently(this.outputConn); + if (throwException != null) { + throw throwException; + } } private void processBatch(Context context) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index ee2ae0b..bd1a310 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -270,6 +270,12 @@ public class IndexTool extends Configured implements Tool { } + void closeConnection() throws SQLException { + if (this.connection != null) { + this.connection.close(); + } + } + public Job getJob() throws Exception { if (isPartialBuild) { return configureJobForPartialBuild(); @@ -518,11 +524,13 @@ public class IndexTool extends Configured implements Tool { final Configuration configuration = job.getConfiguration(); final String physicalIndexTable = PhoenixConfigurationUtil.getPhysicalTableName(configuration); - org.apache.hadoop.hbase.client.Connection conn = ConnectionFactory.createConnection(configuration); - TableName tablename = TableName.valueOf(physicalIndexTable); - HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tablename),conn.getRegionLocator(tablename)); + try(org.apache.hadoop.hbase.client.Connection conn = + ConnectionFactory.createConnection(configuration)) { + TableName tablename = TableName.valueOf(physicalIndexTable); + HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tablename), + conn.getRegionLocator(tablename)); + } return job; - } /** @@ -566,6 +574,8 @@ public class IndexTool extends Configured implements Tool { Connection connection = null; Table htable = null; RegionLocator regionLocator = null; + JobFactory jobFactory = null; + org.apache.hadoop.hbase.client.Connection hConn = null; try { CommandLine cmdLine = null; try { @@ -580,13 +590,14 @@ public class IndexTool extends Configured implements Tool { tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt()); configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); } - connection = ConnectionUtil.getInputConnection(configuration); schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt()); indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()); qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); - pDataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable); + try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) { + pDataTable = PhoenixRuntime.getTableNoCache(tempConn, qDataTable); + } useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt()); String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()); boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); @@ -612,8 +623,8 @@ public class IndexTool extends Configured implements Tool { } htable = connection.unwrap(PhoenixConnection.class).getQueryServices() .getTable(pIndexTable.getPhysicalName().getBytes()); - regionLocator = - ConnectionFactory.createConnection(configuration).getRegionLocator( + hConn = ConnectionFactory.createConnection(configuration); + regionLocator = hConn.getRegionLocator( TableName.valueOf(pIndexTable.getPhysicalName().getBytes())); if (IndexType.LOCAL.equals(pIndexTable.getIndexType())) { isLocalIndexBuild = true; @@ -641,7 +652,8 @@ public class IndexTool extends Configured implements Tool { fs.delete(outputPath, true); } - job = new JobFactory(connection, configuration, outputPath).getJob(); + jobFactory = new JobFactory(connection, configuration, outputPath); + job = jobFactory.getJob(); if (!isForeground && useDirectApi) { LOG.info("Running Index Build in Background - Submit async and exit"); @@ -675,19 +687,52 @@ public class IndexTool extends Configured implements Tool { + ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex)); return -1; } finally { + boolean rethrowException = false; try { if (connection != null) { - connection.close(); + try { + connection.close(); + } catch (SQLException e) { + LOG.error("Failed to close connection ", e); + rethrowException = true; + } } if (htable != null) { - htable.close(); + try { + htable.close(); + } catch (IOException e) { + LOG.error("Failed to close htable ", e); + rethrowException = true; + } + } + if (hConn != null) { + try { + hConn.close(); + } catch (IOException e) { + LOG.error("Failed to close hconnection ", e); + rethrowException = true; + } + } + if (regionLocator != null) { + try { + regionLocator.close(); + } catch (IOException e) { + LOG.error("Failed to close regionLocator ", e); + rethrowException = true; + } + } + if (jobFactory != null) { + try { + jobFactory.closeConnection(); + } catch (SQLException e) { + LOG.error("Failed to close jobFactory ", e); + rethrowException = true; + } } - if(regionLocator != null) { - regionLocator.close(); + } finally { + if (rethrowException) { + throw new RuntimeException("Failed to close resource"); } - } catch (SQLException sqle) { - LOG.error("Failed to close connection ", sqle.getMessage()); - throw new RuntimeException("Failed to close connection"); } } } @@ -695,11 +740,11 @@ public class IndexTool extends Configured implements Tool { private void splitIndexTable(PhoenixConnection pConnection, boolean autosplit, int autosplitNumRegions, double samplingRate, Configuration configuration) throws SQLException, IOException, IllegalArgumentException, InterruptedException { int numRegions; - - try (RegionLocator regionLocator = - ConnectionFactory.createConnection(configuration).getRegionLocator( - TableName.valueOf(qDataTable))) { + try (org.apache.hadoop.hbase.client.Connection tempHConn = + ConnectionFactory.createConnection(configuration); + RegionLocator regionLocator = + tempHConn.getRegionLocator(TableName.valueOf(qDataTable))) { numRegions = regionLocator.getStartKeys().length; if (autosplit && !(numRegions > autosplitNumRegions)) { LOG.info(String.format( diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java index e2ac491..e148f67 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java @@ -105,7 +105,8 @@ public class PhoenixIndexImportDirectMapper extends final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); this.pStatement = connection.prepareStatement(upsertQuery); - } catch (SQLException e) { + } catch (Exception e) { + tryClosingResources(); throw new RuntimeException(e); } } @@ -179,17 +180,20 @@ public class PhoenixIndexImportDirectMapper extends context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount); throw new RuntimeException(e); } finally { - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ", - e.getMessage()); - } - } - if (writer != null) { - writer.close(); + tryClosingResources(); + } + } + + private void tryClosingResources() throws IOException { + if (this.connection != null) { + try { + this.connection.close(); + } catch (SQLException e) { + LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e); } } + if (this.writer != null) { + this.writer.close(); + } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java index 8318adf..5253bfd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java @@ -94,6 +94,7 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD this.pStatement = connection.prepareStatement(upsertQuery); } catch (SQLException e) { + tryClosingConnection(); throw new RuntimeException(e.getMessage()); } } @@ -162,14 +163,17 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD @Override protected void cleanup(Context context) throws IOException, InterruptedException { - super.cleanup(context); - if (connection != null) { - try { + super.cleanup(context); + tryClosingConnection(); + } + + private void tryClosingConnection() { + if (connection != null) { + try { connection.close(); } catch (SQLException e) { - LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ", - e.getMessage()); + LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e); } - } + } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java index 67ec62b..c79359d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java @@ -96,6 +96,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr this.mutations = Lists.newArrayListWithExpectedSize(batchSize); maintainers=new ImmutableBytesPtr(PhoenixConfigurationUtil.getIndexMaintainers(configuration)); } catch (SQLException e) { + tryClosingResources(); throw new RuntimeException(e.getMessage()); } } @@ -170,17 +171,21 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); throw new RuntimeException(e); } finally { - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ", - e.getMessage()); - } - } - if (writer != null) { - writer.close(); + tryClosingResources(); + } + } + + private void tryClosingResources() throws IOException { + if (this.connection != null) { + try { + this.connection.close(); + } catch (SQLException e) { + LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e); } } + if (this.writer != null) { + this.writer.close(); + } } + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 9d8e12e..f09cf0b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -329,18 +329,20 @@ public final class PhoenixConfigurationUtil { } final String tableName = getOutputTableName(configuration); Preconditions.checkNotNull(tableName); - final Connection connection = ConnectionUtil.getOutputConnection(configuration); - List<String> upsertColumnList = PhoenixConfigurationUtil.getUpsertColumnNames(configuration); - if(!upsertColumnList.isEmpty()) { - LOG.info(String.format("UseUpsertColumns=%s, upsertColumnList.size()=%s, upsertColumnList=%s " - ,!upsertColumnList.isEmpty(), upsertColumnList.size(), Joiner.on(",").join(upsertColumnList) - )); - } - columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList); - // we put the encoded column infos in the Configuration for re usability. - ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); - connection.close(); - return columnMetadataList; + try (final Connection connection = ConnectionUtil.getOutputConnection(configuration)) { + List<String> upsertColumnList = + PhoenixConfigurationUtil.getUpsertColumnNames(configuration); + if(!upsertColumnList.isEmpty()) { + LOG.info(String.format("UseUpsertColumns=%s, upsertColumnList.size()=%s," + + " upsertColumnList=%s ",!upsertColumnList.isEmpty(), + upsertColumnList.size(), Joiner.on(",").join(upsertColumnList))); + } + columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, + upsertColumnList); + // we put the encoded column infos in the Configuration for re usability. + ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); + } + return columnMetadataList; } public static String getUpsertStatement(final Configuration configuration) throws SQLException { @@ -387,12 +389,13 @@ public final class PhoenixConfigurationUtil { if (tenantId != null) { props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); } - final Connection connection = ConnectionUtil.getInputConnection(configuration, props); - final List<String> selectColumnList = getSelectColumnList(configuration); - columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList); - // we put the encoded column infos in the Configuration for re usability. - ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); - connection.close(); + try (final Connection connection = ConnectionUtil.getInputConnection(configuration, props)){ + final List<String> selectColumnList = getSelectColumnList(configuration); + columnMetadataList = + PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList); + // we put the encoded column infos in the Configuration for re usability. + ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); + } return columnMetadataList; } @@ -428,9 +431,9 @@ public final class PhoenixConfigurationUtil { Preconditions.checkNotNull(configuration); long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE); if(batchSize <= 0) { - Connection conn = ConnectionUtil.getOutputConnection(configuration); - batchSize = ((PhoenixConnection) conn).getMutateBatchSize(); - conn.close(); + try (Connection conn = ConnectionUtil.getOutputConnection(configuration)) { + batchSize = ((PhoenixConnection) conn).getMutateBatchSize(); + } } configuration.setLong(UPSERT_BATCH_SIZE, batchSize); return batchSize;