Repository: phoenix Updated Branches: refs/heads/master e554e85b4 -> 8a34e7cf2
PHOENIX-1143 Prevent race condition between creating phoenix connection and closing phoenix driver/connection query services (Samarth Jain) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8a34e7cf Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8a34e7cf Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8a34e7cf Branch: refs/heads/master Commit: 8a34e7cf28890d9a07bd7022e432cdb321459174 Parents: e554e85 Author: James Taylor <[email protected]> Authored: Fri Aug 8 14:41:38 2014 -0700 Committer: James Taylor <[email protected]> Committed: Fri Aug 8 14:41:38 2014 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/DecodeFunctionIT.java | 9 +- .../org/apache/phoenix/jdbc/PhoenixDriver.java | 163 ++++++++++++------- .../phoenix/jdbc/PhoenixEmbeddedDriver.java | 3 + .../query/ConnectionQueryServicesImpl.java | 79 ++++++--- .../apache/phoenix/jdbc/PhoenixTestDriver.java | 17 +- 5 files changed, 181 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8a34e7cf/phoenix-core/src/it/java/org/apache/phoenix/end2end/DecodeFunctionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DecodeFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DecodeFunctionIT.java index 19dad3e..05e2504 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DecodeFunctionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DecodeFunctionIT.java @@ -17,6 +17,9 @@ */ package org.apache.phoenix.end2end; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; @@ -26,12 +29,10 @@ import java.sql.SQLException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.PDataType; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import org.junit.Test; +import org.junit.experimental.categories.Category; +@Category(HBaseManagedTimeTest.class) public class DecodeFunctionIT extends BaseHBaseManagedTimeIT { @Test http://git-wip-us.apache.org/repos/asf/phoenix/blob/8a34e7cf/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java index ac8f330..10b15dc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.jdbc; +import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.Collection; @@ -24,6 +25,10 @@ import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.annotation.concurrent.GuardedBy; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.ConnectionQueryServicesImpl; @@ -52,6 +57,7 @@ import org.slf4j.LoggerFactory; public final class PhoenixDriver extends PhoenixEmbeddedDriver { private static final Logger logger = LoggerFactory.getLogger(PhoenixDriver.class); public static final PhoenixDriver INSTANCE; + private static volatile String driverShutdownMsg; static { try { DriverManager.registerDriver( INSTANCE = new PhoenixDriver() ); @@ -64,6 +70,8 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver { INSTANCE.close(); } catch (SQLException e) { logger.warn("Unable to close PhoenixDriver on shutdown", e); + } finally { + driverShutdownMsg = "Phoenix driver closed because server is shutting down"; } } }); @@ -78,27 +86,37 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver { // Use production services implementation super(); } - + + // writes guarded by "this" private volatile QueryServices services; + + @GuardedBy("closeLock") private volatile boolean closed = false; + private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); + @Override public QueryServices getQueryServices() { - checkClosed(); + try { + closeLock.readLock().lock(); + checkClosed(); - // Lazy initialize QueryServices so that we only attempt to create an HBase Configuration - // object upon the first attempt to connect to any cluster. Otherwise, an attempt will be - // made at driver initialization time which is too early for some systems. - QueryServices result = services; - if (result == null) { - synchronized(this) { - result = services; - if(result == null) { - services = result = new QueryServicesImpl(getDefaultProps()); - } - } - } - return result; + // Lazy initialize QueryServices so that we only attempt to create an HBase Configuration + // object upon the first attempt to connect to any cluster. Otherwise, an attempt will be + // made at driver initialization time which is too early for some systems. + QueryServices result = services; + if (result == null) { + synchronized(this) { + result = services; + if(result == null) { + services = result = new QueryServicesImpl(getDefaultProps()); + } + } + } + return result; + } finally { + closeLock.readLock().unlock(); + } } @Override @@ -106,71 +124,96 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver { // Accept the url only if test=true attribute not set return super.acceptsURL(url) && !isTestUrl(url); } - + + @Override + public Connection connect(String url, Properties info) throws SQLException { + try { + closeLock.readLock().lock(); + checkClosed(); + return super.connect(url, info); + } finally { + closeLock.readLock().unlock(); + } + } + @Override protected ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException { - checkClosed(); - - ConnectionInfo connInfo = ConnectionInfo.create(url); - QueryServices services = getQueryServices(); - ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps()); - ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(normalizedConnInfo); - if (connectionQueryServices == null) { - if (normalizedConnInfo.isConnectionless()) { - connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo); - } else { - connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo); + try { + closeLock.readLock().lock(); + checkClosed(); + ConnectionInfo connInfo = ConnectionInfo.create(url); + QueryServices services = getQueryServices(); + ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps()); + ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(normalizedConnInfo); + if (connectionQueryServices == null) { + if (normalizedConnInfo.isConnectionless()) { + connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo); + } else { + connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo); + } + ConnectionQueryServices prevValue = connectionQueryServicesMap.putIfAbsent(normalizedConnInfo, connectionQueryServices); + if (prevValue != null) { + connectionQueryServices = prevValue; + } } - ConnectionQueryServices prevValue = connectionQueryServicesMap.putIfAbsent(normalizedConnInfo, connectionQueryServices); - if (prevValue != null) { - connectionQueryServices = prevValue; + boolean success = false; + SQLException sqlE = null; + try { + connectionQueryServices.init(url, info); + success = true; + } catch (SQLException e) { + sqlE = e; } - } - boolean success = false; - SQLException sqlE = null; - try { - connectionQueryServices.init(url, info); - success = true; - } catch (SQLException e) { - sqlE = e; - } - finally { - if (!success) { - try { - connectionQueryServices.close(); - } catch (SQLException e) { - if (sqlE == null) { - sqlE = e; - } else { - sqlE.setNextException(e); - } - } finally { - // Remove from map, as initialization failed - connectionQueryServicesMap.remove(normalizedConnInfo); - if (sqlE != null) { - throw sqlE; + finally { + if (!success) { + try { + connectionQueryServices.close(); + } catch (SQLException e) { + if (sqlE == null) { + sqlE = e; + } else { + sqlE.setNextException(e); + } + } finally { + // Remove from map, as initialization failed + connectionQueryServicesMap.remove(normalizedConnInfo); + if (sqlE != null) { + throw sqlE; + } } } } + return connectionQueryServices; + } finally { + closeLock.readLock().unlock(); } - return connectionQueryServices; } private void checkClosed() { if (closed) { - throw new IllegalStateException("The Phoenix jdbc driver has been closed."); + throwDriverClosedException(); } } + + private void throwDriverClosedException() { + throw new IllegalStateException(driverShutdownMsg != null ? driverShutdownMsg : "The Phoenix jdbc driver has been closed."); + } @Override public synchronized void close() throws SQLException { - if (closed) { - return; + try { + closeLock.writeLock().lock(); + if (closed) { + return; + } + closed = true; + } finally { + closeLock.writeLock().unlock(); } - closed = true; + try { Collection<ConnectionQueryServices> connectionQueryServices = connectionQueryServicesMap.values(); - try { + try { SQLCloseables.closeAll(connectionQueryServices); } finally { connectionQueryServices.clear(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8a34e7cf/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java index ca27075..c706486 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java @@ -29,6 +29,8 @@ import java.util.Properties; import java.util.StringTokenizer; import java.util.logging.Logger; +import javax.annotation.concurrent.Immutable; + import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -50,6 +52,7 @@ import com.google.common.collect.Maps; * * @since 0.1 */ +@Immutable public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoenix.jdbc.Jdbc7Shim.Driver, SQLCloseable { /** * The protocol for Phoenix Network Client http://git-wip-us.apache.org/repos/asf/phoenix/blob/8a34e7cf/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 066e753..eb77d53 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -38,6 +38,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import javax.annotation.concurrent.GuardedBy; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -151,9 +153,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private final String userName; private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices; private final StatsManager statsManager; + // Cache the latest meta data here for future connections - private volatile PMetaData latestMetaData; + @GuardedBy("latestMetaDataLock") + private PMetaData latestMetaData; private final Object latestMetaDataLock = new Object(); + // Lowest HBase version on the cluster. private int lowestClusterHBaseVersion = Integer.MAX_VALUE; private boolean hasInvalidIndexConfiguration = false; @@ -161,7 +166,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private HConnection connection; private volatile boolean initialized; + + // writes guarded by "this" private volatile boolean closed; + private volatile SQLException initializationException; protected ConcurrentMap<SequenceKey,Sequence> sequenceMap = Maps.newConcurrentMap(); private KeyValueBuilder kvBuilder; @@ -305,7 +313,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } finally { try { childServices.clear(); - latestMetaData = null; + synchronized (latestMetaDataLock) { + latestMetaData = null; + latestMetaDataLock.notifyAll(); + } if (connection != null) connection.close(); } catch (IOException e) { if (sqlE == null) { @@ -397,16 +408,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement @Override public PMetaData addTable(PTable table) throws SQLException { - try { - // If existing table isn't older than new table, don't replace - // If a client opens a connection at an earlier timestamp, this can happen - PTable existingTable = latestMetaData.getTable(new PTableKey(table.getTenantId(), table.getName().getString())); - if (existingTable.getTimeStamp() >= table.getTimeStamp()) { - return latestMetaData; - } - } catch (TableNotFoundException e) { - } - synchronized(latestMetaDataLock) { + synchronized (latestMetaDataLock) { + try { + throwConnectionClosedIfNullMetaData(); + // If existing table isn't older than new table, don't replace + // If a client opens a connection at an earlier timestamp, this can happen + PTable existingTable = latestMetaData.getTable(new PTableKey(table.getTenantId(), table.getName().getString())); + if (existingTable.getTimeStamp() >= table.getTimeStamp()) { + return latestMetaData; + } + } catch (TableNotFoundException e) {} latestMetaData = latestMetaData.addTable(table); latestMetaDataLock.notifyAll(); return latestMetaData; @@ -422,7 +433,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement * @param tenantId TODO */ private PMetaData metaDataMutated(PName tenantId, String tableName, long tableSeqNum, Mutator mutator) throws SQLException { - synchronized(latestMetaDataLock) { + synchronized (latestMetaDataLock) { + throwConnectionClosedIfNullMetaData(); PMetaData metaData = latestMetaData; PTable table; long endTime = System.currentTimeMillis() + DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS; @@ -483,6 +495,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement @Override public PMetaData removeTable(PName tenantId, final String tableName) throws SQLException { synchronized(latestMetaDataLock) { + throwConnectionClosedIfNullMetaData(); latestMetaData = latestMetaData.removeTable(tenantId, tableName); latestMetaDataLock.notifyAll(); return latestMetaData; @@ -507,7 +520,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement @Override public PhoenixConnection connect(String url, Properties info) throws SQLException { - return new PhoenixConnection(this, url, info, latestMetaData); + checkClosed(); + synchronized (latestMetaDataLock) { + throwConnectionClosedIfNullMetaData(); + latestMetaDataLock.notifyAll(); + return new PhoenixConnection(this, url, info, latestMetaData); + } } @@ -945,7 +963,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement String parentTableName = Bytes.toString(physicalTableName, MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX_BYTES.length, physicalTableName.length - MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX_BYTES.length); try { - table = latestMetaData.getTable(new PTableKey(PName.EMPTY_NAME, parentTableName)); + synchronized (latestMetaDataLock) { + throwConnectionClosedIfNullMetaData(); + table = latestMetaData.getTable(new PTableKey(PName.EMPTY_NAME, parentTableName)); + latestMetaDataLock.notifyAll(); + } if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be the case throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString()); } @@ -1247,7 +1269,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement MetaDataUtil.VIEW_INDEX_TABLE_PREFIX_BYTES.length, physicalIndexTableName.length-MetaDataUtil.VIEW_INDEX_TABLE_PREFIX_BYTES.length); try { - table = latestMetaData.getTable(new PTableKey(tenantId, name)); + synchronized (latestMetaDataLock) { + throwConnectionClosedIfNullMetaData(); + table = latestMetaData.getTable(new PTableKey(tenantId, name)); + latestMetaDataLock.notifyAll(); + } if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be the case throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString()); } @@ -1452,10 +1478,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return null; } - if (closed) { - throw new SQLException("The connection to the cluster has been closed."); - } - + checkClosed(); SQLException sqlE = null; PhoenixConnection metaConnection = null; try { @@ -1970,4 +1993,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return userName; } + private void checkClosed() { + if (closed) { + throwConnectionClosedException(); + } + } + + private void throwConnectionClosedIfNullMetaData() { + if (latestMetaData == null) { + throwConnectionClosedException(); + } + } + + private void throwConnectionClosedException() { + throw new IllegalStateException("Connection to the cluster is closed"); + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8a34e7cf/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java index 2423344..0d3c461 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.jdbc; +import java.sql.Connection; import java.sql.SQLException; import java.util.Properties; @@ -50,9 +51,8 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver { @GuardedBy("this") private final QueryServices queryServices; - //The only place it is modified is under a lock provided by "this". - //So ok to have it just as volatile. - private volatile boolean closed = false; + @GuardedBy("this") + private boolean closed = false; public PhoenixTestDriver() { this.overrideProps = ReadOnlyProps.EMPTY_PROPS; @@ -73,11 +73,16 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver { @Override public boolean acceptsURL(String url) throws SQLException { - checkClosed(); // Accept the url only if test=true attribute set return super.acceptsURL(url) && isTestUrl(url); } - + + @Override + public synchronized Connection connect(String url, Properties info) throws SQLException { + checkClosed(); + return super.connect(url, info); + } + @Override // public for testing public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException { checkClosed(); @@ -92,7 +97,7 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver { return connectionQueryServices; } - private void checkClosed() { + private synchronized void checkClosed() { if (closed) { throw new IllegalStateException("The Phoenix jdbc test driver has been closed."); }
