Repository: spark
Updated Branches:
  refs/heads/master c4fd2a242 -> 853809e94


[SPARK-5155] [PYSPARK] [STREAMING] Mqtt streaming support in Python

This PR is based on #4229, thanks prabeesh.

Closes #4229

Author: Prabeesh K <prabsma...@gmail.com>
Author: zsxwing <zsxw...@gmail.com>
Author: prabs <prabsma...@gmail.com>
Author: Prabeesh K <prabees...@namshi.com>

Closes #7833 from zsxwing/pr4229 and squashes the following commits:

9570bec [zsxwing] Fix the variable name and check null in finally
4a9c79e [zsxwing] Fix pom.xml indentation
abf5f18 [zsxwing] Merge branch 'master' into pr4229
935615c [zsxwing] Fix the flaky MQTT tests
47278c5 [zsxwing] Include the project class files
478f844 [zsxwing] Add unpack
5f8a1d4 [zsxwing] Make the maven build generate the test jar for Python MQTT 
tests
734db99 [zsxwing] Merge branch 'master' into pr4229
126608a [Prabeesh K] address the comments
b90b709 [Prabeesh K] Merge pull request #1 from zsxwing/pr4229
d07f454 [zsxwing] Register StreamingListerner before starting StreamingContext; 
Revert unncessary changes; fix the python unit test
a6747cb [Prabeesh K] wait for starting the receiver before publishing data
87fc677 [Prabeesh K] address the comments:
97244ec [zsxwing] Make sbt build the assembly test jar for streaming mqtt
80474d1 [Prabeesh K] fix
1f0cfe9 [Prabeesh K] python style fix
e1ee016 [Prabeesh K] scala style fix
a5a8f9f [Prabeesh K] added Python test
9767d82 [Prabeesh K] implemented Python-friendly class
a11968b [Prabeesh K] fixed python style
795ec27 [Prabeesh K] address comments
ee387ae [Prabeesh K] Fix assembly jar location of mqtt-assembly
3f4df12 [Prabeesh K] updated version
b34c3c1 [prabs] adress comments
3aa7fff [prabs] Added Python streaming mqtt word count example
b7d42ff [prabs] Mqtt streaming support in Python


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/853809e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/853809e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/853809e9

Branch: refs/heads/master
Commit: 853809e948e7c5092643587a30738115b6591a59
Parents: c4fd2a2
Author: Prabeesh K <prabsma...@gmail.com>
Authored: Mon Aug 10 16:33:23 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Aug 10 16:33:23 2015 -0700

----------------------------------------------------------------------
 dev/run-tests.py                                |   2 +
 dev/sparktestsupport/modules.py                 |   2 +
 docs/streaming-programming-guide.md             |   2 +-
 .../src/main/python/streaming/mqtt_wordcount.py |  58 +++++++++
 external/mqtt-assembly/pom.xml                  | 102 ++++++++++++++++
 external/mqtt/pom.xml                           |  28 +++++
 external/mqtt/src/main/assembly/assembly.xml    |  44 +++++++
 .../apache/spark/streaming/mqtt/MQTTUtils.scala |  16 +++
 .../spark/streaming/mqtt/MQTTStreamSuite.scala  | 118 +++----------------
 .../spark/streaming/mqtt/MQTTTestUtils.scala    | 111 +++++++++++++++++
 pom.xml                                         |   1 +
 project/SparkBuild.scala                        |  12 +-
 python/pyspark/streaming/mqtt.py                |  72 +++++++++++
 python/pyspark/streaming/tests.py               | 106 ++++++++++++++++-
 14 files changed, 565 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/dev/run-tests.py
----------------------------------------------------------------------
diff --git a/dev/run-tests.py b/dev/run-tests.py
index d1852b9..f689425 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -303,6 +303,8 @@ def build_spark_sbt(hadoop_version):
                  "assembly/assembly",
                  "streaming-kafka-assembly/assembly",
                  "streaming-flume-assembly/assembly",
+                 "streaming-mqtt-assembly/assembly",
+                 "streaming-mqtt/test:assembly",
                  "streaming-kinesis-asl-assembly/assembly"]
     profiles_and_goals = build_profiles + sbt_goals
 

http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index a9717ff..d82c0cc 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -181,6 +181,7 @@ streaming_mqtt = Module(
     dependencies=[streaming],
     source_file_regexes=[
         "external/mqtt",
+        "external/mqtt-assembly",
     ],
     sbt_test_goals=[
         "streaming-mqtt/test",
@@ -306,6 +307,7 @@ pyspark_streaming = Module(
         streaming,
         streaming_kafka,
         streaming_flume_assembly,
+        streaming_mqtt,
         streaming_kinesis_asl
     ],
     source_file_regexes=[

http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index dbfdb61..c59d936 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -683,7 +683,7 @@ for Java, and 
[StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
 {:.no_toc}
 
 <span class="badge" style="background-color: grey">Python API</span> As of 
Spark {{site.SPARK_VERSION_SHORT}},
-out of these sources, *only* Kafka and Flume are available in the Python API. 
We will add more advanced sources in the Python API in future.
+out of these sources, *only* Kafka, Flume and MQTT are available in the Python 
API. We will add more advanced sources in the Python API in future.
 
 This category of sources require interfacing with external non-Spark 
libraries, some of them with
 complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues 
related to version conflicts

http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/examples/src/main/python/streaming/mqtt_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/streaming/mqtt_wordcount.py 
b/examples/src/main/python/streaming/mqtt_wordcount.py
new file mode 100644
index 0000000..617ce5e
--- /dev/null
+++ b/examples/src/main/python/streaming/mqtt_wordcount.py
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ A sample wordcount with MqttStream stream
+ Usage: mqtt_wordcount.py <broker url> <topic>
+
+ To run this in your local machine, you need to setup a MQTT broker and 
publisher first,
+ Mosquitto is one of the open source MQTT Brokers, see
+ http://mosquitto.org/
+ Eclipse paho project provides number of clients and utilities for working 
with MQTT, see
+ http://www.eclipse.org/paho/#getting-started
+
+ and then run the example
+    `$ bin/spark-submit --jars external/mqtt-assembly/target/scala-*/\
+      spark-streaming-mqtt-assembly-*.jar 
examples/src/main/python/streaming/mqtt_wordcount.py \
+      tcp://localhost:1883 foo`
+"""
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+from pyspark.streaming.mqtt import MQTTUtils
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        print >> sys.stderr, "Usage: mqtt_wordcount.py <broker url> <topic>"
+        exit(-1)
+
+    sc = SparkContext(appName="PythonStreamingMQTTWordCount")
+    ssc = StreamingContext(sc, 1)
+
+    brokerUrl = sys.argv[1]
+    topic = sys.argv[2]
+
+    lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
+    counts = lines.flatMap(lambda line: line.split(" ")) \
+        .map(lambda word: (word, 1)) \
+        .reduceByKey(lambda a, b: a+b)
+    counts.pprint()
+
+    ssc.start()
+    ssc.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/external/mqtt-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml
new file mode 100644
index 0000000..9c94473
--- /dev/null
+++ b/external/mqtt-assembly/pom.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.10</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-mqtt-assembly_2.10</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project External MQTT Assembly</name>
+  <url>http://spark.apache.org/</url>
+
+  <properties>
+    <sbt.project.name>streaming-mqtt-assembly</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <configuration>
+          <shadedArtifactAttached>false</shadedArtifactAttached>
+          
<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar</outputFile>
+          <artifactSet>
+            <includes>
+              <include>*:*</include>
+            </includes>
+          </artifactSet>
+          <filters>
+            <filter>
+              <artifact>*:*</artifact>
+              <excludes>
+                <exclude>META-INF/*.SF</exclude>
+                <exclude>META-INF/*.DSA</exclude>
+                <exclude>META-INF/*.RSA</exclude>
+              </excludes>
+            </filter>
+          </filters>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <transformers>
+                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                <transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                  <resource>reference.conf</resource>
+                </transformer>
+                <transformer 
implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                  <resource>log4j.properties</resource>
+                </transformer>
+                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/external/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
index 0e41e57..69b3098 100644
--- a/external/mqtt/pom.xml
+++ b/external/mqtt/pom.xml
@@ -78,5 +78,33 @@
   <build>
     
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
     
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+
+    <plugins>
+      <!-- Assemble a jar with test dependencies for Python tests -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>test-jar-with-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <!-- Make sure the file path is same as the sbt build -->
+              
<finalName>spark-streaming-mqtt-test-${project.version}</finalName>
+              
<outputDirectory>${project.build.directory}/scala-${scala.binary.version}/</outputDirectory>
+              <appendAssemblyId>false</appendAssemblyId>
+              <!-- Don't publish it since it's only for Python tests -->
+              <attach>false</attach>
+              <descriptors>
+                <descriptor>src/main/assembly/assembly.xml</descriptor>
+              </descriptors>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
   </build>
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/external/mqtt/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/assembly/assembly.xml 
b/external/mqtt/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..ecab5b3
--- /dev/null
+++ b/external/mqtt/src/main/assembly/assembly.xml
@@ -0,0 +1,44 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<assembly>
+  <id>test-jar-with-dependencies</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+
+  <fileSets>
+    <fileSet>
+      
<directory>${project.build.directory}/scala-${scala.binary.version}/test-classes</directory>
+      <outputDirectory>/</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+  <dependencySets>
+    <dependencySet>
+      <useTransitiveDependencies>true</useTransitiveDependencies>
+      <scope>test</scope>
+      <unpack>true</unpack>
+      <excludes>
+        <exclude>org.apache.hadoop:*:jar</exclude>
+        <exclude>org.apache.zookeeper:*:jar</exclude>
+        <exclude>org.apache.avro:*:jar</exclude>
+      </excludes>
+    </dependencySet>
+  </dependencySets>
+
+</assembly>

http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala 
b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 1142d0f..38a1114 100644
--- 
a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ 
b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -74,3 +74,19 @@ object MQTTUtils {
     createStream(jssc.ssc, brokerUrl, topic, storageLevel)
   }
 }
+
+/**
+ * This is a helper class that wraps the methods in MQTTUtils into more 
Python-friendly class and
+ * function so that it can be easily instantiated and called from Python's 
MQTTUtils.
+ */
+private class MQTTUtilsPythonHelper {
+
+  def createStream(
+      jssc: JavaStreamingContext,
+      brokerUrl: String,
+      topic: String,
+      storageLevel: StorageLevel
+    ): JavaDStream[String] = {
+    MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 
b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index c4bf5aa..a6a9249 100644
--- 
a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ 
b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -17,46 +17,30 @@
 
 package org.apache.spark.streaming.mqtt
 
-import java.net.{URI, ServerSocket}
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
-import org.apache.activemq.broker.{TransportConnector, BrokerService}
-import org.apache.commons.lang3.RandomUtils
-import org.eclipse.paho.client.mqttv3._
-import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
-
 import org.scalatest.BeforeAndAfter
 import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.scheduler.StreamingListener
-import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.util.Utils
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 
 class MQTTStreamSuite extends SparkFunSuite with Eventually with 
BeforeAndAfter {
 
   private val batchDuration = Milliseconds(500)
   private val master = "local[2]"
   private val framework = this.getClass.getSimpleName
-  private val freePort = findFreePort()
-  private val brokerUri = "//localhost:" + freePort
   private val topic = "def"
-  private val persistenceDir = Utils.createTempDir()
 
   private var ssc: StreamingContext = _
-  private var broker: BrokerService = _
-  private var connector: TransportConnector = _
+  private var mqttTestUtils: MQTTTestUtils = _
 
   before {
     ssc = new StreamingContext(master, framework, batchDuration)
-    setupMQTT()
+    mqttTestUtils = new MQTTTestUtils
+    mqttTestUtils.setup()
   }
 
   after {
@@ -64,14 +48,17 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually 
with BeforeAndAfter
       ssc.stop()
       ssc = null
     }
-    Utils.deleteRecursively(persistenceDir)
-    tearDownMQTT()
+    if (mqttTestUtils != null) {
+      mqttTestUtils.teardown()
+      mqttTestUtils = null
+    }
   }
 
   test("mqtt input stream") {
     val sendMessage = "MQTT demo for spark streaming"
-    val receiveStream =
-      MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, 
StorageLevel.MEMORY_ONLY)
+    val receiveStream = MQTTUtils.createStream(ssc, "tcp://" + 
mqttTestUtils.brokerUri, topic,
+      StorageLevel.MEMORY_ONLY)
+
     @volatile var receiveMessage: List[String] = List()
     receiveStream.foreachRDD { rdd =>
       if (rdd.collect.length > 0) {
@@ -79,89 +66,14 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually 
with BeforeAndAfter
         receiveMessage
       }
     }
-    ssc.start()
 
-    // wait for the receiver to start before publishing data, or we risk 
failing
-    // the test nondeterministically. See SPARK-4631
-    waitForReceiverToStart()
+    ssc.start()
 
-    publishData(sendMessage)
+    // Retry it because we don't know when the receiver will start.
     eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+      mqttTestUtils.publishData(topic, sendMessage)
       assert(sendMessage.equals(receiveMessage(0)))
     }
     ssc.stop()
   }
-
-  private def setupMQTT() {
-    broker = new BrokerService()
-    broker.setDataDirectoryFile(Utils.createTempDir())
-    connector = new TransportConnector()
-    connector.setName("mqtt")
-    connector.setUri(new URI("mqtt:" + brokerUri))
-    broker.addConnector(connector)
-    broker.start()
-  }
-
-  private def tearDownMQTT() {
-    if (broker != null) {
-      broker.stop()
-      broker = null
-    }
-    if (connector != null) {
-      connector.stop()
-      connector = null
-    }
-  }
-
-  private def findFreePort(): Int = {
-    val candidatePort = RandomUtils.nextInt(1024, 65536)
-    Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
-      val socket = new ServerSocket(trialPort)
-      socket.close()
-      (null, trialPort)
-    }, new SparkConf())._2
-  }
-
-  def publishData(data: String): Unit = {
-    var client: MqttClient = null
-    try {
-      val persistence = new 
MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
-      client = new MqttClient("tcp:" + brokerUri, 
MqttClient.generateClientId(), persistence)
-      client.connect()
-      if (client.isConnected) {
-        val msgTopic = client.getTopic(topic)
-        val message = new MqttMessage(data.getBytes("utf-8"))
-        message.setQos(1)
-        message.setRetained(true)
-
-        for (i <- 0 to 10) {
-          try {
-            msgTopic.publish(message)
-          } catch {
-            case e: MqttException if e.getReasonCode == 
MqttException.REASON_CODE_MAX_INFLIGHT =>
-              // wait for Spark streaming to consume something from the 
message queue
-              Thread.sleep(50)
-          }
-        }
-      }
-    } finally {
-      client.disconnect()
-      client.close()
-      client = null
-    }
-  }
-
-  /**
-   * Block until at least one receiver has started or timeout occurs.
-   */
-  private def waitForReceiverToStart() = {
-    val latch = new CountDownLatch(1)
-    ssc.addStreamingListener(new StreamingListener {
-      override def onReceiverStarted(receiverStarted: 
StreamingListenerReceiverStarted) {
-        latch.countDown()
-      }
-    })
-
-    assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to 
start.")
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
 
b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
new file mode 100644
index 0000000..1a371b7
--- /dev/null
+++ 
b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import java.net.{ServerSocket, URI}
+
+import scala.language.postfixOps
+
+import com.google.common.base.Charsets.UTF_8
+import org.apache.activemq.broker.{BrokerService, TransportConnector}
+import org.apache.commons.lang3.RandomUtils
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
+
+import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkConf}
+
+/**
+ * Share codes for Scala and Python unit tests
+ */
+private class MQTTTestUtils extends Logging {
+
+  private val persistenceDir = Utils.createTempDir()
+  private val brokerHost = "localhost"
+  private val brokerPort = findFreePort()
+
+  private var broker: BrokerService = _
+  private var connector: TransportConnector = _
+
+  def brokerUri: String = {
+    s"$brokerHost:$brokerPort"
+  }
+
+  def setup(): Unit = {
+    broker = new BrokerService()
+    broker.setDataDirectoryFile(Utils.createTempDir())
+    connector = new TransportConnector()
+    connector.setName("mqtt")
+    connector.setUri(new URI("mqtt://" + brokerUri))
+    broker.addConnector(connector)
+    broker.start()
+  }
+
+  def teardown(): Unit = {
+    if (broker != null) {
+      broker.stop()
+      broker = null
+    }
+    if (connector != null) {
+      connector.stop()
+      connector = null
+    }
+    Utils.deleteRecursively(persistenceDir)
+  }
+
+  private def findFreePort(): Int = {
+    val candidatePort = RandomUtils.nextInt(1024, 65536)
+    Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
+      val socket = new ServerSocket(trialPort)
+      socket.close()
+      (null, trialPort)
+    }, new SparkConf())._2
+  }
+
+  def publishData(topic: String, data: String): Unit = {
+    var client: MqttClient = null
+    try {
+      val persistence = new 
MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+      client = new MqttClient("tcp://" + brokerUri, 
MqttClient.generateClientId(), persistence)
+      client.connect()
+      if (client.isConnected) {
+        val msgTopic = client.getTopic(topic)
+        val message = new MqttMessage(data.getBytes(UTF_8))
+        message.setQos(1)
+        message.setRetained(true)
+
+        for (i <- 0 to 10) {
+          try {
+            msgTopic.publish(message)
+          } catch {
+            case e: MqttException if e.getReasonCode == 
MqttException.REASON_CODE_MAX_INFLIGHT =>
+              // wait for Spark streaming to consume something from the 
message queue
+              Thread.sleep(50)
+          }
+        }
+      }
+    } finally {
+      if (client != null) {
+        client.disconnect()
+        client.close()
+        client = null
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2bcc55b..8942836 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,6 +104,7 @@
     <module>external/flume-sink</module>
     <module>external/flume-assembly</module>
     <module>external/mqtt</module>
+    <module>external/mqtt-assembly</module>
     <module>external/zeromq</module>
     <module>examples</module>
     <module>repl</module>

http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 9a33baa..41a85fa 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -45,8 +45,8 @@ object BuildCommons {
     sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", 
"ganglia-lgpl",
     "kinesis-asl").map(ProjectRef(buildLocation, _))
 
-  val assemblyProjects@Seq(assembly, examples, networkYarn, 
streamingFlumeAssembly, streamingKafkaAssembly, streamingKinesisAslAssembly) =
-    Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", 
"streaming-kafka-assembly", "streaming-kinesis-asl-assembly")
+  val assemblyProjects@Seq(assembly, examples, networkYarn, 
streamingFlumeAssembly, streamingKafkaAssembly, streamingMqttAssembly, 
streamingKinesisAslAssembly) =
+    Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", 
"streaming-kafka-assembly", "streaming-mqtt-assembly", 
"streaming-kinesis-asl-assembly")
       .map(ProjectRef(buildLocation, _))
 
   val tools = ProjectRef(buildLocation, "tools")
@@ -212,6 +212,9 @@ object SparkBuild extends PomBuild {
   /* Enable Assembly for all assembly projects */
   assemblyProjects.foreach(enable(Assembly.settings))
 
+  /* Enable Assembly for streamingMqtt test */
+  enable(inConfig(Test)(Assembly.settings))(streamingMqtt)
+
   /* Package pyspark artifacts in a separate zip file for YARN. */
   enable(PySparkAssembly.settings)(assembly)
 
@@ -382,13 +385,16 @@ object Assembly {
         
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
     },
     jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, 
mName, hv) =>
-      if (mName.contains("streaming-flume-assembly") || 
mName.contains("streaming-kafka-assembly") || 
mName.contains("streaming-kinesis-asl-assembly")) {
+      if (mName.contains("streaming-flume-assembly") || 
mName.contains("streaming-kafka-assembly") || 
mName.contains("streaming-mqtt-assembly") || 
mName.contains("streaming-kinesis-asl-assembly")) {
         // This must match the same name used in maven (see 
external/kafka-assembly/pom.xml)
         s"${mName}-${v}.jar"
       } else {
         s"${mName}-${v}-hadoop${hv}.jar"
       }
     },
+    jarName in (Test, assembly) <<= (version, moduleName, hadoopVersion) map { 
(v, mName, hv) =>
+      s"${mName}-test-${v}.jar"
+    },
     mergeStrategy in assembly := {
       case PathList("org", "datanucleus", xs @ _*)             => 
MergeStrategy.discard
       case m if m.toLowerCase.endsWith("manifest.mf")          => 
MergeStrategy.discard

http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/python/pyspark/streaming/mqtt.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py
new file mode 100644
index 0000000..f065989
--- /dev/null
+++ b/python/pyspark/streaming/mqtt.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from py4j.java_gateway import Py4JJavaError
+
+from pyspark.storagelevel import StorageLevel
+from pyspark.serializers import UTF8Deserializer
+from pyspark.streaming import DStream
+
+__all__ = ['MQTTUtils']
+
+
+class MQTTUtils(object):
+
+    @staticmethod
+    def createStream(ssc, brokerUrl, topic,
+                     storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
+        """
+        Create an input stream that pulls messages from a Mqtt Broker.
+        :param ssc:  StreamingContext object
+        :param brokerUrl:  Url of remote mqtt publisher
+        :param topic:  topic name to subscribe to
+        :param storageLevel:  RDD storage level.
+        :return: A DStream object
+        """
+        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+
+        try:
+            helperClass = 
ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+                
.loadClass("org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper")
+            helper = helperClass.newInstance()
+            jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
+        except Py4JJavaError as e:
+            if 'ClassNotFoundException' in str(e.java_exception):
+                MQTTUtils._printErrorMsg(ssc.sparkContext)
+            raise e
+
+        return DStream(jstream, ssc, UTF8Deserializer())
+
+    @staticmethod
+    def _printErrorMsg(sc):
+        print("""
+________________________________________________________________________________________________
+
+  Spark Streaming's MQTT libraries not found in class path. Try one of the 
following.
+
+  1. Include the MQTT library and its dependencies with in the
+     spark-submit command as
+
+     $ bin/spark-submit --packages org.apache.spark:spark-streaming-mqtt:%s ...
+
+  2. Download the JAR of the artifact from Maven Central 
http://search.maven.org/,
+     Group Id = org.apache.spark, Artifact Id = spark-streaming-mqtt-assembly, 
Version = %s.
+     Then, include the jar in the spark-submit command as
+
+     $ bin/spark-submit --jars <spark-streaming-mqtt-assembly.jar> ...
+________________________________________________________________________________________________
+""" % (sc.version, sc.version))

http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 5cd544b..66ae334 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -40,6 +40,7 @@ from pyspark.storagelevel import StorageLevel
 from pyspark.streaming.context import StreamingContext
 from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, 
TopicAndPartition
 from pyspark.streaming.flume import FlumeUtils
+from pyspark.streaming.mqtt import MQTTUtils
 from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
 
 
@@ -893,6 +894,68 @@ class FlumePollingStreamTests(PySparkStreamingTestCase):
         self._testMultipleTimes(self._testFlumePollingMultipleHosts)
 
 
+class MQTTStreamTests(PySparkStreamingTestCase):
+    timeout = 20  # seconds
+    duration = 1
+
+    def setUp(self):
+        super(MQTTStreamTests, self).setUp()
+
+        MQTTTestUtilsClz = 
self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+            .loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils")
+        self._MQTTTestUtils = MQTTTestUtilsClz.newInstance()
+        self._MQTTTestUtils.setup()
+
+    def tearDown(self):
+        if self._MQTTTestUtils is not None:
+            self._MQTTTestUtils.teardown()
+            self._MQTTTestUtils = None
+
+        super(MQTTStreamTests, self).tearDown()
+
+    def _randomTopic(self):
+        return "topic-%d" % random.randint(0, 10000)
+
+    def _startContext(self, topic):
+        # Start the StreamingContext and also collect the result
+        stream = MQTTUtils.createStream(self.ssc, "tcp://" + 
self._MQTTTestUtils.brokerUri(), topic)
+        result = []
+
+        def getOutput(_, rdd):
+            for data in rdd.collect():
+                result.append(data)
+
+        stream.foreachRDD(getOutput)
+        self.ssc.start()
+        return result
+
+    def test_mqtt_stream(self):
+        """Test the Python MQTT stream API."""
+        sendData = "MQTT demo for spark streaming"
+        topic = self._randomTopic()
+        result = self._startContext(topic)
+
+        def retry():
+            self._MQTTTestUtils.publishData(topic, sendData)
+            # Because "publishData" sends duplicate messages, here we should 
use > 0
+            self.assertTrue(len(result) > 0)
+            self.assertEqual(sendData, result[0])
+
+        # Retry it because we don't know when the receiver will start.
+        self._retry_or_timeout(retry)
+
+    def _retry_or_timeout(self, test_func):
+        start_time = time.time()
+        while True:
+            try:
+                test_func()
+                break
+            except:
+                if time.time() - start_time > self.timeout:
+                    raise
+                time.sleep(0.01)
+
+
 class KinesisStreamTests(PySparkStreamingTestCase):
 
     def test_kinesis_stream_api(self):
@@ -985,7 +1048,42 @@ def search_flume_assembly_jar():
             "'build/mvn package' before running this test")
     elif len(jars) > 1:
         raise Exception(("Found multiple Spark Streaming Flume assembly JARs 
in %s; please "
-                         "remove all but one") % flume_assembly_dir)
+                        "remove all but one") % flume_assembly_dir)
+    else:
+        return jars[0]
+
+
+def search_mqtt_assembly_jar():
+    SPARK_HOME = os.environ["SPARK_HOME"]
+    mqtt_assembly_dir = os.path.join(SPARK_HOME, "external/mqtt-assembly")
+    jars = glob.glob(
+        os.path.join(mqtt_assembly_dir, 
"target/scala-*/spark-streaming-mqtt-assembly-*.jar"))
+    if not jars:
+        raise Exception(
+            ("Failed to find Spark Streaming MQTT assembly jar in %s. " % 
mqtt_assembly_dir) +
+            "You need to build Spark with "
+            "'build/sbt assembly/assembly streaming-mqtt-assembly/assembly' or 
"
+            "'build/mvn package' before running this test")
+    elif len(jars) > 1:
+        raise Exception(("Found multiple Spark Streaming MQTT assembly JARs in 
%s; please "
+                         "remove all but one") % mqtt_assembly_dir)
+    else:
+        return jars[0]
+
+
+def search_mqtt_test_jar():
+    SPARK_HOME = os.environ["SPARK_HOME"]
+    mqtt_test_dir = os.path.join(SPARK_HOME, "external/mqtt")
+    jars = glob.glob(
+        os.path.join(mqtt_test_dir, 
"target/scala-*/spark-streaming-mqtt-test-*.jar"))
+    if not jars:
+        raise Exception(
+            ("Failed to find Spark Streaming MQTT test jar in %s. " % 
mqtt_test_dir) +
+            "You need to build Spark with "
+            "'build/sbt assembly/assembly streaming-mqtt/test:assembly'")
+    elif len(jars) > 1:
+        raise Exception(("Found multiple Spark Streaming MQTT test JARs in %s; 
please "
+                         "remove all but one") % mqtt_test_dir)
     else:
         return jars[0]
 
@@ -1012,8 +1110,12 @@ def search_kinesis_asl_assembly_jar():
 if __name__ == "__main__":
     kafka_assembly_jar = search_kafka_assembly_jar()
     flume_assembly_jar = search_flume_assembly_jar()
+    mqtt_assembly_jar = search_mqtt_assembly_jar()
+    mqtt_test_jar = search_mqtt_test_jar()
     kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
-    jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, 
kinesis_asl_assembly_jar)
+
+    jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, 
kinesis_asl_assembly_jar,
+                               mqtt_assembly_jar, mqtt_test_jar)
 
     os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
     unittest.main()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to