This is an automated email from the ASF dual-hosted git repository. tdsilva pushed a commit to branch 4.14-HBase-1.3 in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit fc54cf39ad1fdc27db6bd39251b155d988d4895a Author: Chinmay Kulkarni <chinmayskulka...@gmail.com> AuthorDate: Thu Mar 14 23:16:14 2019 -0700 PHOENIX-5184: HBase and Phoenix connection leaks in Indexing code path, OrphanViewTool and PhoenixConfigurationUtil --- .../UngroupedAggregateRegionObserver.java | 6 ++- .../hbase/index/write/RecoveryIndexWriter.java | 10 ++-- .../phoenix/mapreduce/AbstractBulkLoadTool.java | 15 ++---- .../apache/phoenix/mapreduce/OrphanViewTool.java | 53 ++++++++++----------- .../phoenix/mapreduce/PhoenixRecordWriter.java | 18 +++++-- .../mapreduce/index/DirectHTableWriter.java | 14 +++++- .../mapreduce/index/IndexScrutinyMapper.java | 24 ++++++++-- .../apache/phoenix/mapreduce/index/IndexTool.java | 55 ++++++++++++++++------ .../index/PhoenixIndexImportDirectMapper.java | 26 +++++----- .../mapreduce/index/PhoenixIndexImportMapper.java | 16 ++++--- .../index/PhoenixIndexPartialBuildMapper.java | 25 ++++++---- .../mapreduce/util/PhoenixConfigurationUtil.java | 45 +++++++++--------- .../apache/phoenix/parse/DropTableStatement.java | 4 +- 13 files changed, 190 insertions(+), 121 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 2eb15a1..f0ce5b2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -817,7 +817,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } try { if (targetHTable != null) { - targetHTable.close(); + try { + targetHTable.close(); + } catch (IOException e) { + logger.error("Closing table: " + targetHTable + " failed: ", e); + } } } finally { try { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java index 35f0a6d..fb96666 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java @@ -26,8 +26,6 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -55,15 +53,13 @@ public class RecoveryIndexWriter extends IndexWriter { * Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected to be fully setup * before calling. * - * @param committer * @param policy * @param env + * @param name * @throws IOException - * @throws ZooKeeperConnectionException - * @throws MasterNotRunningException */ public RecoveryIndexWriter(IndexFailurePolicy policy, RegionCoprocessorEnvironment env, String name) - throws MasterNotRunningException, ZooKeeperConnectionException, IOException { + throws IOException { super(new TrackingParallelWriterIndexCommitter(), policy, env, name); this.admin = new HBaseAdmin(env.getConfiguration()); } @@ -125,7 +121,7 @@ public class RecoveryIndexWriter extends IndexWriter { try { admin.close(); } catch (IOException e) { - // closing silently + LOG.error("Closing the admin failed: ", e); } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index f717647..4561152 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -17,7 +17,6 @@ */ package org.apache.phoenix.mapreduce; -import java.io.IOException; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -37,20 +36,17 @@ import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; -import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixDriver; @@ -62,7 +58,6 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.IndexUtil; -import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; @@ -70,7 +65,6 @@ import org.apache.phoenix.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.collect.Lists; @@ -350,10 +344,11 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); String tableName = table.getPhysicalName(); Path tableOutputPath = CsvBulkImportUtil.getOutputPath(outputPath, tableName); - HTable htable = new HTable(conf,tableName); - LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath); - loader.doBulkLoad(tableOutputPath, htable); - LOG.info("Incremental load complete for table=" + tableName); + try(HTable htable = new HTable(conf,tableName)) { + LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath); + loader.doBulkLoad(tableOutputPath, htable); + LOG.info("Incremental load complete for table=" + tableName); + } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java index 2e0dd0d..f5ea35a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java @@ -408,14 +408,16 @@ public class OrphanViewTool extends Configured implements Tool { } else { tenantConnection = phoenixConnection; } - String fullViewName = SchemaUtil.getTableName(key.getSchemaName(), key.getTableName()); - String dropTable = String.format("DROP VIEW IF EXISTS %s CASCADE", fullViewName); + + MetaDataClient client = new MetaDataClient(tenantConnection); + org.apache.phoenix.parse.TableName pTableName = org.apache.phoenix.parse.TableName + .create(key.getSchemaName(), key.getTableName()); try { - tenantConnection.createStatement().execute(dropTable); - tenantConnection.commit(); + client.dropTable( + new DropTableStatement(pTableName, PTableType.VIEW, false, true)); } catch (TableNotFoundException e) { - LOG.info("Ignoring view " + fullViewName + " as it has already been dropped"); + LOG.info("Ignoring view " + pTableName + " as it has already been dropped"); } } finally { if (newConn) { @@ -424,22 +426,6 @@ public class OrphanViewTool extends Configured implements Tool { } } - /** - * Try closing a connection if it is not null - * @param connection connection object - * @throws RuntimeException if closing the connection fails - */ - private void tryClosingConnection(Connection connection) { - try { - if (connection != null) { - connection.close(); - } - } catch (SQLException sqlE) { - LOG.error("Failed to close connection: ", sqlE); - throw new RuntimeException("Failed to close connection with exception: ", sqlE); - } - } - private void removeLink(PhoenixConnection phoenixConnection, Key src, Key dst, PTable.LinkType linkType) throws Exception { String deleteQuery = "DELETE FROM " + ((linkType == PTable.LinkType.PHYSICAL_TABLE || linkType == PTable.LinkType.PARENT_TABLE) ? SYSTEM_CATALOG_NAME : SYSTEM_CHILD_LINK_NAME) + @@ -798,14 +784,7 @@ public class OrphanViewTool extends Configured implements Tool { } private void closeConnectionAndFiles(Connection connection) throws IOException { - try { - if (connection != null) { - connection.close(); - } - } catch (SQLException sqle) { - LOG.error("Failed to close connection ", sqle.getMessage()); - throw new RuntimeException("Failed to close connection"); - } + tryClosingConnection(connection); for (byte i = VIEW; i < ORPHAN_TYPE_COUNT; i++) { if (writer[i] != null) { writer[i].close(); @@ -817,6 +796,22 @@ public class OrphanViewTool extends Configured implements Tool { } /** + * Try closing a connection if it is not null + * @param connection connection object + * @throws RuntimeException if closing the connection fails + */ + private void tryClosingConnection(Connection connection) { + try { + if (connection != null) { + connection.close(); + } + } catch (SQLException sqlE) { + LOG.error("Failed to close connection: ", sqlE); + throw new RuntimeException("Failed to close connection with exception: ", sqlE); + } + } + + /** * Examples for input arguments: * -c : cleans orphan views * -c -op /tmp/ : cleans orphan views and links, and logs their names to the files named Orphan*.txt in /tmp/ diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java index 52f2fe3..b67ba74 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java @@ -52,10 +52,20 @@ public class PhoenixRecordWriter<T extends DBWritable> extends RecordWriter<Nul } public PhoenixRecordWriter(final Configuration configuration, Set<String> propsToIgnore) throws SQLException { - this.conn = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore); - this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration); - final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); - this.statement = this.conn.prepareStatement(upsertQuery); + Connection connection = null; + try { + connection = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore); + this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration); + final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); + this.statement = connection.prepareStatement(upsertQuery); + this.conn = connection; + } catch (Exception e) { + // Only close the connection in case of an exception, so cannot use try-with-resources + if (connection != null) { + connection.close(); + } + throw e; + } } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java index 59b26b2..32d2f3b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.mapreduce.index; +import java.io.Closeable; import java.io.IOException; import java.util.List; @@ -56,6 +57,7 @@ public class DirectHTableWriter { LOG.info("Created table instance for " + tableName); } catch (IOException e) { LOG.error("IOException : ", e); + tryClosingResourceSilently(this.table); throw new RuntimeException(e); } } @@ -73,7 +75,17 @@ public class DirectHTableWriter { return table; } + private void tryClosingResourceSilently(Closeable res) { + if (res != null) { + try { + res.close(); + } catch (IOException e) { + LOG.error("Closing resource: " + res + " failed with error: ", e); + } + } + } + public void close() throws IOException { - table.close(); + tryClosingResourceSilently(this.table); } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java index 81081bf..c651077 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java @@ -149,10 +149,23 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit LOG.info("Target table base query: " + targetTableQuery); md5 = MessageDigest.getInstance("MD5"); } catch (SQLException | NoSuchAlgorithmException e) { + tryClosingResourceSilently(this.outputUpsertStmt); + tryClosingResourceSilently(this.connection); + tryClosingResourceSilently(this.outputConn); throw new RuntimeException(e); } } + private static void tryClosingResourceSilently(AutoCloseable res) { + if (res != null) { + try { + res.close(); + } catch (Exception e) { + LOG.error("Closing resource: " + res + " failed :", e); + } + } + } + @Override protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context) throws IOException, InterruptedException { @@ -180,18 +193,21 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); + tryClosingResourceSilently(this.outputUpsertStmt); + IOException throwException = null; if (connection != null) { try { processBatch(context); connection.close(); - if (outputConn != null) { - outputConn.close(); - } } catch (SQLException e) { LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e); - throw new IOException(e); + throwException = new IOException(e); } } + tryClosingResourceSilently(this.outputConn); + if (throwException != null) { + throw throwException; + } } private void processBatch(Context context) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index a665a91..c91f53a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -267,6 +267,12 @@ public class IndexTool extends Configured implements Tool { } + void closeConnection() throws SQLException { + if (this.connection != null) { + this.connection.close(); + } + } + public Job getJob() throws Exception { if (isPartialBuild) { return configureJobForPartialBuild(); @@ -514,10 +520,10 @@ public class IndexTool extends Configured implements Tool { final Configuration configuration = job.getConfiguration(); final String physicalIndexTable = PhoenixConfigurationUtil.getPhysicalTableName(configuration); - final HTable htable = new HTable(configuration, physicalIndexTable); - HFileOutputFormat.configureIncrementalLoad(job, htable); + try(final HTable htable = new HTable(configuration, physicalIndexTable)) { + HFileOutputFormat.configureIncrementalLoad(job, htable); + } return job; - } /** @@ -525,9 +531,6 @@ public class IndexTool extends Configured implements Tool { * waits for the job completion based on runForeground parameter. * * @param job - * @param outputPath - * @param runForeground - if true, waits for job completion, else submits and returns - * immediately. * @return * @throws Exception */ @@ -560,6 +563,7 @@ public class IndexTool extends Configured implements Tool { public int run(String[] args) throws Exception { Connection connection = null; HTable htable = null; + JobFactory jobFactory = null; try { CommandLine cmdLine = null; try { @@ -574,13 +578,14 @@ public class IndexTool extends Configured implements Tool { tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt()); configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); } - connection = ConnectionUtil.getInputConnection(configuration); schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt()); indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()); qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); - pDataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable); + try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) { + pDataTable = PhoenixRuntime.getTableNoCache(tempConn, qDataTable); + } useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt()); String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()); boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); @@ -607,7 +612,6 @@ public class IndexTool extends Configured implements Tool { } htable = (HTable)connection.unwrap(PhoenixConnection.class).getQueryServices() .getTable(pIndexTable.getPhysicalName().getBytes()); - if (IndexType.LOCAL.equals(pIndexTable.getIndexType())) { isLocalIndexBuild = true; splitKeysBeforeJob = htable.getRegionLocator().getStartKeys(); @@ -635,7 +639,8 @@ public class IndexTool extends Configured implements Tool { fs.delete(outputPath, true); } - job = new JobFactory(connection, configuration, outputPath).getJob(); + jobFactory = new JobFactory(connection, configuration, outputPath); + job = jobFactory.getJob(); if (!isForeground && useDirectApi) { LOG.info("Running Index Build in Background - Submit async and exit"); @@ -668,16 +673,36 @@ public class IndexTool extends Configured implements Tool { + ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex)); return -1; } finally { + boolean rethrowException = false; try { if (connection != null) { - connection.close(); + try { + connection.close(); + } catch (SQLException e) { + LOG.error("Failed to close connection ", e); + rethrowException = true; + } } if (htable != null) { - htable.close(); + try { + htable.close(); + } catch (IOException e) { + LOG.error("Failed to close htable ", e); + rethrowException = true; + } + } + if (jobFactory != null) { + try { + jobFactory.closeConnection(); + } catch (SQLException e) { + LOG.error("Failed to close jobFactory ", e); + rethrowException = true; + } + } + } finally { + if (rethrowException) { + throw new RuntimeException("Failed to close resource"); } - } catch (SQLException sqle) { - LOG.error("Failed to close connection ", sqle.getMessage()); - throw new RuntimeException("Failed to close connection"); } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java index eb4bc0e..7328014 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java @@ -103,7 +103,8 @@ public class PhoenixIndexImportDirectMapper extends final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); this.pStatement = connection.prepareStatement(upsertQuery); - } catch (SQLException e) { + } catch (Exception e) { + tryClosingResources(); throw new RuntimeException(e); } } @@ -176,17 +177,20 @@ public class PhoenixIndexImportDirectMapper extends context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); throw new RuntimeException(e); } finally { - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ", - e.getMessage()); - } - } - if (writer != null) { - writer.close(); + tryClosingResources(); + } + } + + private void tryClosingResources() throws IOException { + if (this.connection != null) { + try { + this.connection.close(); + } catch (SQLException e) { + LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e); } } + if (this.writer != null) { + this.writer.close(); + } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java index 9e0d629..e060bc3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java @@ -83,6 +83,7 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD this.pStatement = connection.prepareStatement(upsertQuery); } catch (SQLException e) { + tryClosingConnection(); throw new RuntimeException(e.getMessage()); } } @@ -125,14 +126,17 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD @Override protected void cleanup(Context context) throws IOException, InterruptedException { - super.cleanup(context); - if (connection != null) { - try { + super.cleanup(context); + tryClosingConnection(); + } + + private void tryClosingConnection() { + if (connection != null) { + try { connection.close(); } catch (SQLException e) { - LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ", - e.getMessage()); + LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e); } - } + } } } \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java index f4ecac2..2077137 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java @@ -96,6 +96,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr this.mutations = Lists.newArrayListWithExpectedSize(batchSize); maintainers=new ImmutableBytesPtr(PhoenixConfigurationUtil.getIndexMaintainers(configuration)); } catch (SQLException e) { + tryClosingResources(); throw new RuntimeException(e.getMessage()); } } @@ -170,17 +171,21 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); throw new RuntimeException(e); } finally { - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ", - e.getMessage()); - } - } - if (writer != null) { - writer.close(); + tryClosingResources(); + } + } + + private void tryClosingResources() throws IOException { + if (this.connection != null) { + try { + this.connection.close(); + } catch (SQLException e) { + LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e); } } + if (this.writer != null) { + this.writer.close(); + } } + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 6788e5f..0ff7904 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -303,18 +303,20 @@ public final class PhoenixConfigurationUtil { } final String tableName = getOutputTableName(configuration); Preconditions.checkNotNull(tableName); - final Connection connection = ConnectionUtil.getOutputConnection(configuration); - List<String> upsertColumnList = PhoenixConfigurationUtil.getUpsertColumnNames(configuration); - if(!upsertColumnList.isEmpty()) { - LOG.info(String.format("UseUpsertColumns=%s, upsertColumnList.size()=%s, upsertColumnList=%s " - ,!upsertColumnList.isEmpty(), upsertColumnList.size(), Joiner.on(",").join(upsertColumnList) - )); - } - columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList); - // we put the encoded column infos in the Configuration for re usability. - ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); - connection.close(); - return columnMetadataList; + try (final Connection connection = ConnectionUtil.getOutputConnection(configuration)) { + List<String> upsertColumnList = + PhoenixConfigurationUtil.getUpsertColumnNames(configuration); + if(!upsertColumnList.isEmpty()) { + LOG.info(String.format("UseUpsertColumns=%s, upsertColumnList.size()=%s," + + " upsertColumnList=%s ",!upsertColumnList.isEmpty(), + upsertColumnList.size(), Joiner.on(",").join(upsertColumnList))); + } + columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, + upsertColumnList); + // we put the encoded column infos in the Configuration for re usability. + ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); + } + return columnMetadataList; } public static String getUpsertStatement(final Configuration configuration) throws SQLException { @@ -361,12 +363,13 @@ public final class PhoenixConfigurationUtil { if (tenantId != null) { props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); } - final Connection connection = ConnectionUtil.getInputConnection(configuration, props); - final List<String> selectColumnList = getSelectColumnList(configuration); - columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList); - // we put the encoded column infos in the Configuration for re usability. - ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); - connection.close(); + try (final Connection connection = ConnectionUtil.getInputConnection(configuration, props)){ + final List<String> selectColumnList = getSelectColumnList(configuration); + columnMetadataList = + PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList); + // we put the encoded column infos in the Configuration for re usability. + ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); + } return columnMetadataList; } @@ -401,9 +404,9 @@ public final class PhoenixConfigurationUtil { Preconditions.checkNotNull(configuration); long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE); if(batchSize <= 0) { - Connection conn = ConnectionUtil.getOutputConnection(configuration); - batchSize = ((PhoenixConnection) conn).getMutateBatchSize(); - conn.close(); + try (Connection conn = ConnectionUtil.getOutputConnection(configuration)) { + batchSize = ((PhoenixConnection) conn).getMutateBatchSize(); + } } configuration.setLong(UPSERT_BATCH_SIZE, batchSize); return batchSize; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java index 997b695..7cadb19 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java @@ -25,9 +25,9 @@ public class DropTableStatement extends MutableStatement { private final boolean ifExists; private final PTableType tableType; private final boolean cascade; - - protected DropTableStatement(TableName tableName, PTableType tableType, boolean ifExists, boolean cascade) { + + public DropTableStatement(TableName tableName, PTableType tableType, boolean ifExists, boolean cascade) { this.tableName = tableName; this.tableType = tableType; this.ifExists = ifExists;