GEODE-137: use local GemFire server to initialize LocalCache whenever possible.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/2e2a795d Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/2e2a795d Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/2e2a795d Branch: refs/heads/develop Commit: 2e2a795db57e21cc83784f0b3111b0b5e355ad87 Parents: 7d4ae09 Author: Qihong Chen <qc...@pivotal.io> Authored: Wed Jul 29 10:02:26 2015 -0700 Committer: Qihong Chen <qc...@pivotal.io> Committed: Thu Aug 6 10:44:36 2015 -0700 ---------------------------------------------------------------------- .../connector/GemFirePairRDDFunctions.scala | 5 +- .../spark/connector/GemFireRDDFunctions.scala | 5 +- .../internal/DefaultGemFireConnection.scala | 30 +++++-- .../connector/internal/LocatorHelper.scala | 91 +++++++++++++++++++- .../internal/rdd/GemFireRegionRDD.scala | 2 +- .../gemfire/spark/connector/package.scala | 7 ++ .../spark/connector/LocatorHelperTest.scala | 77 +++++++++++++++++ 7 files changed, 208 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala index 86ec596..8050a5e 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala @@ -23,7 +23,10 @@ class GemFirePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable w connConf: GemFireConnectionConf = defaultConnectionConf, opConf: Map[String, String] = Map.empty): Unit = { connConf.getConnection.validateRegion[K, V](regionPath) - logInfo(s"Save RDD id=${rdd.id} to region $regionPath") + if (log.isDebugEnabled) + logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""") + else + logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""") val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf, opConf) rdd.sparkContext.runJob(rdd, writer.write _) } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala index 3aa1ebd..5415727 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala @@ -25,7 +25,10 @@ class GemFireRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging connConf: GemFireConnectionConf = defaultConnectionConf, opConf: Map[String, String] = Map.empty): Unit = { connConf.getConnection.validateRegion[K, V](regionPath) - logInfo(s"Save RDD id=${rdd.id} to region $regionPath") + if (log.isDebugEnabled) + logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""") + else + logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""") val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf, opConf) rdd.sparkContext.runJob(rdd, writer.write(func) _) } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala index bba6c69..3fcb496 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala @@ -1,5 +1,7 @@ package io.pivotal.gemfire.spark.connector.internal +import java.net.InetAddress + import com.gemstone.gemfire.cache.client.{ClientCache, ClientCacheFactory, ClientRegionShortcut} import com.gemstone.gemfire.cache.execute.{FunctionException, FunctionService} import com.gemstone.gemfire.cache.query.Query @@ -7,7 +9,7 @@ import com.gemstone.gemfire.cache.{Region, RegionService} import com.gemstone.gemfire.internal.cache.execute.InternalExecution import io.pivotal.gemfire.spark.connector.internal.oql.QueryResultCollector import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartition -import org.apache.spark.Logging +import org.apache.spark.{SparkEnv, Logging} import io.pivotal.gemfire.spark.connector.GemFireConnection import io.pivotal.gemfire.spark.connector.internal.gemfirefunctions._ import java.util.{Set => JSet, List => JList } @@ -30,10 +32,7 @@ private[connector] class DefaultGemFireConnection ( private def initClientCache() : ClientCache = { try { - import io.pivotal.gemfire.spark.connector.map2Properties - logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, props=$gemFireProps""") - val ccf = new ClientCacheFactory(gemFireProps) - locators.foreach { case (host, port) => ccf.addPoolLocator(host, port) } + val ccf = getClientCacheFactory ccf.create() } catch { case e: Exception => @@ -41,6 +40,27 @@ private[connector] class DefaultGemFireConnection ( throw new RuntimeException(e) } } + + private def getClientCacheFactory: ClientCacheFactory = { + import io.pivotal.gemfire.spark.connector.map2Properties + val ccf = new ClientCacheFactory(gemFireProps) + ccf.setPoolReadTimeout(30000) + val servers = LocatorHelper.getAllGemFireServers(locators) + if (servers.isDefined && servers.get.size > 0) { + val sparkIp = System.getenv("SPARK_LOCAL_IP") + val hostName = if (sparkIp != null) InetAddress.getByName(sparkIp).getCanonicalHostName + else InetAddress.getLocalHost.getCanonicalHostName + val executorId = SparkEnv.get.executorId + val pickedServers = LocatorHelper.pickPreferredGemFireServers(servers.get, hostName, executorId) + logInfo(s"""Init ClientCache: severs=${pickedServers.mkString(",")}, host=$hostName executor=$executorId props=$gemFireProps""") + logDebug(s"""Init ClientCache: all-severs=${pickedServers.mkString(",")}""") + pickedServers.foreach{ case (host, port) => ccf.addPoolServer(host, port) } + } else { + logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, props=$gemFireProps""") + locators.foreach { case (host, port) => ccf.addPoolLocator(host, port) } + } + ccf + } /** close the clientCache */ override def close(): Unit = http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala index 550e0bc..a010c62 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala @@ -1,9 +1,17 @@ package io.pivotal.gemfire.spark.connector.internal +import java.net.InetSocketAddress +import java.util.{ArrayList => JArrayList} + +import com.gemstone.gemfire.cache.client.internal.locator.{GetAllServersResponse, GetAllServersRequest} +import com.gemstone.gemfire.distributed.internal.ServerLocation +import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient +import org.apache.spark.Logging + import scala.util.{Failure, Success, Try} -object LocatorHelper { +object LocatorHelper extends Logging { /** valid locator strings are: host[port] and host:port */ final val LocatorPattern1 = """([\w-_]+(\.[\w-_]+)*)\[([0-9]{2,5})\]""".r @@ -27,4 +35,85 @@ object LocatorHelper { def parseLocatorsString(locatorsStr: String): Seq[(String, Int)] = locatorsStr.split(",").map(locatorStr2HostPortPair).map(_.get) + + /** + * Return the list of live GemFire servers for the given locators. + * @param locators locators for the given GemFire cluster + * @param serverGroup optional server group name, default is "" (empty string) + */ + def getAllGemFireServers(locators: Seq[(String, Int)], serverGroup: String = ""): Option[Seq[(String, Int)]] = { + var result: Option[Seq[(String, Int)]] = None + locators.find { case (host, port) => + try { + val addr = new InetSocketAddress(host, port) + val req = new GetAllServersRequest(serverGroup) + val res = TcpClient.requestToServer(addr.getAddress, addr.getPort, req, 2000) + if (res != null) { + import scala.collection.JavaConverters._ + val servers = res.asInstanceOf[GetAllServersResponse].getServers.asInstanceOf[JArrayList[ServerLocation]] + if (servers.size > 0) + result = Some(servers.asScala.map(e => (e.getHostName, e.getPort))) + } + } catch { case e: Exception => logWarning("getAllGemFireServers error", e) + } + result.isDefined + } + result + } + + /** + * Pick up at most 3 preferred servers from all available servers based on + * host name and Spark executor id. + * + * This method is used by DefaultGemFireConnection to create LocalCache. Usually + * one server is enough to initialize LocalCacheFactory, but this provides two + * backup servers in case of the 1st server can't be connected. + * + * @param servers all available servers in the form of (hostname, port) pairs + * @param hostName the host name of the Spark executor + * @param executorId the Spark executor Id, such as "<driver>", "0", "1", ... + * @return Seq[(hostname, port)] of preferred servers + */ + def pickPreferredGemFireServers( + servers: Seq[(String, Int)], hostName: String, executorId: String): Seq[(String, Int)] = { + + // pick up `length` items form the Seq starts at the `start` position. + // The Seq is treated as a ring, so at most `Seq.size` items can be picked + def circularTake[T](seq: Seq[T], start: Int, length: Int): Seq[T] = { + val size = math.min(seq.size, length) + (start until start + size).map(x => seq(x % seq.size)) + } + + // map executor id to int: "<driver>" (or non-number string) to 0, and "n" to n + 1 + val id = try { executorId.toInt + 1 } catch { case e: NumberFormatException => 0 } + + // algorithm: + // 1. sort server list + // 2. split sorted server list into 3 sub-lists a, b, and c: + // list-a: servers on the given host + // list-b: servers that are in front of list-a on the sorted server list + // list-c: servers that are behind list-a on the sorted server list + // then rotate list-a based on executor id, then create new server list: + // modified list-a ++ list-c ++ list-b + // 3. if there's no server on the given host, then create new server list + // by rotating sorted server list based on executor id. + // 4. take up to 3 servers from the new server list + val sortedServers = servers.sorted + val firstIdx = sortedServers.indexWhere(p => p._1 == hostName) + val lastIdx = if (firstIdx < 0) -1 else sortedServers.lastIndexWhere(p => p._1 == hostName) + + if (firstIdx < 0) { // no local server + circularTake(sortedServers, id, 3) + } else { + val (seq1, seq2) = sortedServers.splitAt(firstIdx) + val seq = if (firstIdx == lastIdx) { // one local server + seq2 ++ seq1 + } else { // multiple local server + val (seq3, seq4) = seq2.splitAt(lastIdx - firstIdx + 1) + val seq3b = if (id % seq3.size == 0) seq3 else circularTake(seq3, id, seq3.size) + seq3b ++ seq4 ++ seq1 + } + circularTake(seq, 0, 3) + } + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala index cff61d6..3a987b2 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala @@ -82,7 +82,7 @@ class GemFireRegionRDD[K, V] private[connector] logInfo(s"""RDD id=${this.id} region=$regionPath conn=${connConf.locators.mkString(",")}, env=$opConf""") val p = if (data.isPartitioned) preferredPartitioner else defaultReplicatedRegionPartitioner val splits = p.partitions[K, V](conn, data, opConf) - logDebug(s"""RDD id=${this.id} region=$regionPath partitions=${splits.mkString(",")}""") + logDebug(s"""RDD id=${this.id} region=$regionPath partitions=\n ${splits.mkString("\n ")}""") splits } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala index 72a5bb1..d08e96c 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala @@ -25,6 +25,8 @@ package object connector { final val RDDSaveBatchSizePropKey = "rdd.save.batch.size" final val RDDSaveBatchSizeDefault = 10000 + /** implicits */ + implicit def toSparkContextFunctions(sc: SparkContext): GemFireSparkContextFunctions = new GemFireSparkContextFunctions(sc) @@ -43,4 +45,9 @@ package object connector { implicit def map2Properties(map: Map[String,String]): java.util.Properties = (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props} + /** internal util methods */ + + private[connector] def getRddPartitionsInfo(rdd: RDD[_], sep: String = "\n "): String = + rdd.partitions.zipWithIndex.map{case (p,i) => s"$i: $p loc=${rdd.preferredLocations(p)}"}.mkString(sep) + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala index 508666a..de4b7a7 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala @@ -1,5 +1,7 @@ package unittest.io.pivotal.gemfire.spark.connector +import java.net.InetAddress + import io.pivotal.gemfire.spark.connector.internal.LocatorHelper import org.scalatest.FunSuite @@ -72,4 +74,79 @@ class LocatorHelperTest extends FunSuite { intercept[Exception] { LocatorHelper.parseLocatorsString("local^host:2345,localhost.1234") } } + test("pickPreferredGemFireServers: shared servers and one gf-server per host") { + val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004)) + val servers = Seq(srv1, srv2, srv3, srv4) + verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3)) + verifyPickPreferredGemFireServers(servers, "host2", "0", Seq(srv2, srv3, srv4)) + verifyPickPreferredGemFireServers(servers, "host3", "1", Seq(srv3, srv4, srv1)) + verifyPickPreferredGemFireServers(servers, "host4", "2", Seq(srv4, srv1, srv2)) + } + + test("pickPreferredGemFireServers: shared servers, one gf-server per host, un-sorted list") { + val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004)) + val servers = Seq(srv4, srv2, srv3, srv1) + verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3)) + verifyPickPreferredGemFireServers(servers, "host2", "0", Seq(srv2, srv3, srv4)) + verifyPickPreferredGemFireServers(servers, "host3", "1", Seq(srv3, srv4, srv1)) + verifyPickPreferredGemFireServers(servers, "host4", "2", Seq(srv4, srv1, srv2)) + } + + test("pickPreferredGemFireServers: shared servers and two gf-server per host") { + val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), ("host2", 4003), ("host2", 4004)) + val servers = Seq(srv1, srv2, srv3, srv4) + verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3)) + verifyPickPreferredGemFireServers(servers, "host1", "0", Seq(srv2, srv1, srv3)) + verifyPickPreferredGemFireServers(servers, "host2", "1", Seq(srv3, srv4, srv1)) + verifyPickPreferredGemFireServers(servers, "host2", "2", Seq(srv4, srv3, srv1)) + } + + test("pickPreferredGemFireServers: shared servers, two gf-server per host, un-sorted server list") { + val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), ("host2", 4003), ("host2", 4004)) + val servers = Seq(srv1, srv4, srv3, srv2) + verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3)) + verifyPickPreferredGemFireServers(servers, "host1", "0", Seq(srv2, srv1, srv3)) + verifyPickPreferredGemFireServers(servers, "host2", "1", Seq(srv3, srv4, srv1)) + verifyPickPreferredGemFireServers(servers, "host2", "2", Seq(srv4, srv3, srv1)) + } + + test("pickPreferredGemFireServers: no shared servers and one gf-server per host") { + val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004)) + val servers = Seq(srv1, srv2, srv3, srv4) + verifyPickPreferredGemFireServers(servers, "host5", "<driver>", Seq(srv1, srv2, srv3)) + verifyPickPreferredGemFireServers(servers, "host6", "0", Seq(srv2, srv3, srv4)) + verifyPickPreferredGemFireServers(servers, "host7", "1", Seq(srv3, srv4, srv1)) + verifyPickPreferredGemFireServers(servers, "host8", "2", Seq(srv4, srv1, srv2)) + } + + test("pickPreferredGemFireServers: no shared servers, one gf-server per host, and less gf-server") { + val (srv1, srv2) = (("host1", 4001), ("host2", 4002)) + val servers = Seq(srv1, srv2) + verifyPickPreferredGemFireServers(servers, "host5", "<driver>", Seq(srv1, srv2)) + verifyPickPreferredGemFireServers(servers, "host6", "0", Seq(srv2, srv1)) + verifyPickPreferredGemFireServers(servers, "host7", "1", Seq(srv1, srv2)) + verifyPickPreferredGemFireServers(servers, "host8", "2", Seq(srv2, srv1)) + + + println("host name: " + InetAddress.getLocalHost.getHostName) + println("canonical host name: " + InetAddress.getLocalHost.getCanonicalHostName) + println("canonical host name 2: " + InetAddress.getByName(InetAddress.getLocalHost.getHostName).getCanonicalHostName) + } + + test("pickPreferredGemFireServers: ad-hoc") { + val (srv4, srv5, srv6) = ( + ("w2-gst-pnq-04.gemstone.com", 40411), ("w2-gst-pnq-05.gemstone.com", 40411), ("w2-gst-pnq-06.gemstone.com", 40411)) + val servers = Seq(srv6, srv5, srv4) + verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-03.gemstone.com", "<driver>", Seq(srv4, srv5, srv6)) + verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-04.gemstone.com", "1", Seq(srv4, srv5, srv6)) + verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-05.gemstone.com", "0", Seq(srv5, srv6, srv4)) + verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-06.gemstone.com", "2", Seq(srv6, srv4, srv5)) + } + + def verifyPickPreferredGemFireServers( + servers: Seq[(String, Int)], hostName: String, executorId: String, expectation: Seq[(String, Int)]): Unit = { + val result = LocatorHelper.pickPreferredGemFireServers(servers, hostName, executorId) + assert(result == expectation, s"pick servers for $hostName:$executorId") + } + }