[CARBONDATA-2408] Fix search mode master SaslException issue in the first time
This closes #2239 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fb128974 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fb128974 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fb128974 Branch: refs/heads/spark-2.3 Commit: fb12897474f8eb9b4ca04764f9ee15890573b057 Parents: 452c42b Author: xubo245 <601450...@qq.com> Authored: Fri Apr 27 18:57:37 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Wed May 9 11:02:48 2018 +0800 ---------------------------------------------------------------------- .../carbondata/store/SparkCarbonStore.scala | 1 + .../scala/org/apache/spark/rpc/Master.scala | 24 ++++++++++++++++++++ 2 files changed, 25 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/fb128974/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala index c0d0d09..3a6adea 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala @@ -110,6 +110,7 @@ class SparkCarbonStore extends MetaCachedCarbonStore { } def startSearchMode(): Unit = { + LOG.info("Starting search mode master") master = new Master(session.sparkContext.getConf) master.startService() startAllWorkers() http://git-wip-us.apache.org/repos/asf/carbondata/blob/fb128974/store/search/src/main/scala/org/apache/spark/rpc/Master.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala index df793b4..5b31a49 100644 --- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala +++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala @@ -20,6 +20,7 @@ package org.apache.spark.rpc import java.io.IOException import java.net.{BindException, InetAddress} import java.util.{List => JList, Map => JMap, Objects, Random, UUID} +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -67,6 +68,8 @@ class Master(sparkConf: SparkConf) { /** start service and listen on port passed in constructor */ def startService(): Unit = { if (rpcEnv == null) { + LOG.info("Start search mode master thread") + val isStarted: AtomicBoolean = new AtomicBoolean(false) new Thread(new Runnable { override def run(): Unit = { val hostAddress = InetAddress.getLocalHost.getHostAddress @@ -96,10 +99,31 @@ class Master(sparkConf: SparkConf) { } val registryEndpoint: RpcEndpoint = new Registry(rpcEnv, Master.this) rpcEnv.setupEndpoint("registry-service", registryEndpoint) + if (isStarted.compareAndSet(false, false)) { + synchronized { + isStarted.compareAndSet(false, true) + } + } LOG.info("registry-service started") rpcEnv.awaitTermination() } }).start() + var count = 0 + val countThreshold = 5000 + while (isStarted.compareAndSet(false, false) && count < countThreshold) { + LOG.info(s"Waiting search mode master to start, retrying $count times") + Thread.sleep(10) + count = count + 1; + } + if (count >= countThreshold) { + LOG.error(s"Search mode try $countThreshold times to start master but failed") + throw new RuntimeException( + s"Search mode try $countThreshold times to start master but failed") + } else { + LOG.info("Search mode master started") + } + } else { + LOG.info("Search mode master has already started") } }