Repository: incubator-geode Updated Branches: refs/heads/feature/GEODE-9 60d074cc7 -> 89b9aaff8
GEODE-114: fix race condition in DefaultGemFireConnection.getRegionProxy Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/dc1c1559 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/dc1c1559 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/dc1c1559 Branch: refs/heads/feature/GEODE-9 Commit: dc1c1559dd8a38ffade204f5440755338de2a5c8 Parents: 2793365 Author: Qihong Chen <qc...@pivotal.io> Authored: Tue Jul 14 16:49:18 2015 -0700 Committer: Qihong Chen <qc...@pivotal.io> Committed: Wed Jul 15 09:44:34 2015 -0700 ---------------------------------------------------------------------- .../internal/DefaultGemFireConnection.scala | 8 ++++--- .../DefaultGemFireConnectionManager.scala | 24 ++++++++++++++------ 2 files changed, 22 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dc1c1559/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala index e31186b..bba6c69 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala @@ -23,8 +23,6 @@ private[connector] class DefaultGemFireConnection ( extends GemFireConnection with Logging { private val clientCache = initClientCache() - /** a lock object only used by getRegionProxy...() */ - private val regionLock = new Object /** Register GemFire functions to the GemFire cluster */ FunctionService.registerFunction(RetrieveRegionMetadataFunction.getInstance()) @@ -81,7 +79,7 @@ private[connector] class DefaultGemFireConnection ( def getRegionProxy[K, V](regionPath: String): Region[K, V] = { val region1: Region[K, V] = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]] if (region1 != null) region1 - else regionLock.synchronized { + else DefaultGemFireConnection.regionLock.synchronized { val region2 = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]] if (region2 != null) region2 else clientCache.createClientRegionFactory[K, V](ClientRegionShortcut.PROXY).create(regionPath) @@ -116,6 +114,10 @@ private[connector] class DefaultGemFireConnection ( } } +private[connector] object DefaultGemFireConnection { + /** a lock object only used by getRegionProxy...() */ + private val regionLock = new Object +} /** The purpose of this class is making unit test DefaultGemFireConnectionManager easier */ class DefaultGemFireConnectionFactory { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dc1c1559/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala index 0463340..7495c60 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala @@ -23,17 +23,27 @@ object DefaultGemFireConnectionManager { private[connector] val connections = mutable.Map[(String, Int), GemFireConnection]() /** - * use locator host:port pair to lookup connection. create new connection and add it - * to `connections` if it does not exists. + * use locator host:port pair to lookup cached connection. create new connection + * and add it to the cache `connections` if it does not exist. */ def getConnection(connConf: GemFireConnectionConf) (implicit factory: DefaultGemFireConnectionFactory = new DefaultGemFireConnectionFactory): GemFireConnection = { - val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null) - if (conns.nonEmpty) conns(0) + + def getCachedConnection(locators: Seq[(String, Int)]): GemFireConnection = { + val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null) + if (conns.nonEmpty) conns(0) else null + } + + val conn1 = getCachedConnection(connConf.locators) + if (conn1 != null) conn1 else connections.synchronized { - val conn = factory.newConnection(connConf.locators, connConf.gemfireProps) - connConf.locators.foreach(pair => connections += (pair -> conn)) - conn + val conn2 = getCachedConnection(connConf.locators) + if (conn2 != null) conn2 + else { + val conn3 = factory.newConnection(connConf.locators, connConf.gemfireProps) + connConf.locators.foreach(pair => connections += (pair -> conn3)) + conn3 + } } }