Repository: spark
Updated Branches:
  refs/heads/master b293afc42 -> 85cf06368


[SPARK-5559] [Streaming] [Test] Remove oppotunity we met flakiness when running 
FlumeStreamSuite

When we run FlumeStreamSuite on Jenkins, sometimes we get error like as follows.

    sbt.ForkMain$ForkError: The code passed to eventually never returned 
normally. Attempted 52 times over 10.094849836 seconds. Last failure message: 
Error connecting to localhost/127.0.0.1:23456.
            at 
org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
            at 
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
            at 
org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
            at 
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:307)
           at 
org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
           at 
org.apache.spark.streaming.flume.FlumeStreamSuite.writeAndVerify(FlumeStreamSuite.scala:116)
           at 
org.apache.spark.streaming.flume.FlumeStreamSuite.org$apache$spark$streaming$flume$FlumeStreamSuite$$testFlumeStream(FlumeStreamSuite.scala:74)
           at 
org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply$mcV$sp(FlumeStreamSuite.scala:66)
            at 
org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply(FlumeStreamSuite.scala:66)
            at 
org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply(FlumeStreamSuite.scala:66)
            at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
            at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
            at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
            at org.scalatest.Transformer.apply(Transformer.scala:22)
            at org.scalatest.Transformer.apply(Transformer.scala:20)
            at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
            at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
            at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
            at 
org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
           at 
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
            at 
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
            at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
            at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)

This error is caused by check-then-act logic  when it find free-port .

      /** Find a free port */
      private def findFreePort(): Int = {
        Utils.startServiceOnPort(23456, (trialPort: Int) => {
          val socket = new ServerSocket(trialPort)
          socket.close()
          (null, trialPort)
        }, conf)._2
      }

Removing the check-then-act is not easy but we can reduce the chance of having 
the error by choosing random value for initial port instead of 23456.

Author: Kousuke Saruta <saru...@oss.nttdata.co.jp>

Closes #4337 from sarutak/SPARK-5559 and squashes the following commits:

16f109f [Kousuke Saruta] Added `require` to Utils#startServiceOnPort
c39d8b6 [Kousuke Saruta] Merge branch 'SPARK-5559' of github.com:sarutak/spark 
into SPARK-5559
1610ba2 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into SPARK-5559
33357e3 [Kousuke Saruta] Changed "findFreePort" method in MQTTStreamSuite and 
FlumeStreamSuite so that it can choose valid random port
a9029fe [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into SPARK-5559
9489ef9 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into SPARK-5559
8212e42 [Kousuke Saruta] Modified default port used in FlumeStreamSuite from 
23456 to random value


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85cf0636
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85cf0636
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85cf0636

Branch: refs/heads/master
Commit: 85cf0636825d1997d64d0bdc04618f29b7222da1
Parents: b293afc
Author: Kousuke Saruta <saru...@oss.nttdata.co.jp>
Authored: Tue Mar 24 16:13:25 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Mar 24 16:20:52 2015 +0000

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/util/Utils.scala           | 4 ++++
 .../org/apache/spark/streaming/flume/FlumeStreamSuite.scala     | 5 +++--
 .../scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala | 4 +++-
 3 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/85cf0636/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d9a6716..0b5a914 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1876,6 +1876,10 @@ private[spark] object Utils extends Logging {
       startService: Int => (T, Int),
       conf: SparkConf,
       serviceName: String = ""): (T, Int) = {
+
+    require(startPort == 0 || (1024 <= startPort && startPort < 65536),
+      "startPort should be between 1024 and 65535 (inclusive), or 0 for a 
random free port.")
+
     val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
     val maxRetries = portMaxRetries(conf)
     for (offset <- 0 to maxRetries) {

http://git-wip-us.apache.org/repos/asf/spark/blob/85cf0636/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 322de7b..51d273a 100644
--- 
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -28,6 +28,7 @@ import scala.language.postfixOps
 import com.google.common.base.Charsets
 import org.apache.avro.ipc.NettyTransceiver
 import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.commons.lang3.RandomUtils
 import org.apache.flume.source.avro
 import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
 import org.jboss.netty.channel.ChannelPipeline
@@ -40,7 +41,6 @@ import org.scalatest.concurrent.Eventually._
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Milliseconds, StreamingContext, 
TestOutputStream}
-import org.apache.spark.streaming.scheduler.{StreamingListener, 
StreamingListenerReceiverStarted}
 import org.apache.spark.util.Utils
 
 class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with 
Logging {
@@ -76,7 +76,8 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter 
with Matchers with L
 
   /** Find a free port */
   private def findFreePort(): Int = {
-    Utils.startServiceOnPort(23456, (trialPort: Int) => {
+    val candidatePort = RandomUtils.nextInt(1024, 65536)
+    Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
       val socket = new ServerSocket(trialPort)
       socket.close()
       (null, trialPort)

http://git-wip-us.apache.org/repos/asf/spark/blob/85cf0636/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 
b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 0f3298a..24d78ec 100644
--- 
a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ 
b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -25,6 +25,7 @@ import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.commons.lang3.RandomUtils
 import org.eclipse.paho.client.mqttv3._
 import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
@@ -113,7 +114,8 @@ class MQTTStreamSuite extends FunSuite with Eventually with 
BeforeAndAfter {
   }
 
   private def findFreePort(): Int = {
-    Utils.startServiceOnPort(23456, (trialPort: Int) => {
+    val candidatePort = RandomUtils.nextInt(1024, 65536)
+    Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
       val socket = new ServerSocket(trialPort)
       socket.close()
       (null, trialPort)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to