[GitHub] spark pull request: [SPARK-4631][streaming] Wait for a receiver to...
Github user Bilna commented on a diff in the pull request: https://github.com/apache/spark/pull/4270#discussion_r23831606 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -121,8 +130,18 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { val message: MqttMessage = new MqttMessage(data.getBytes("utf-8")) message.setQos(1) message.setRetained(true) + +def tryPublish(times: Int): Unit = + if (times > 0) try { +msgTopic.publish(message) + } catch { +case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT => + Thread.sleep(50) // wait for Spark streaming to consume something from the message queue + tryPublish(times - 1) + } + for (i <- 0 to 100) { - msgTopic.publish(message) + tryPublish(3) --- End diff -- Please try this also. Along with waitForReceiverToStart(), change the 'for loop' range from 0 to 9 . It is just for testing. I am getting this error only if I change the MqttDefaultFilePersistence to memoryPersistence. I solved the problem by applying the proposed solution above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631][streaming] Wait for a receiver to...
Github user Bilna commented on a diff in the pull request: https://github.com/apache/spark/pull/4270#discussion_r23828884 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -121,8 +130,18 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { val message: MqttMessage = new MqttMessage(data.getBytes("utf-8")) message.setQos(1) message.setRetained(true) + +def tryPublish(times: Int): Unit = + if (times > 0) try { +msgTopic.publish(message) + } catch { +case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT => + Thread.sleep(50) // wait for Spark streaming to consume something from the message queue + tryPublish(times - 1) + } + for (i <- 0 to 100) { - msgTopic.publish(message) + tryPublish(3) --- End diff -- I think waitForReceiverToStart() method is sufficient. Once the receiver is started, since the qos is set to 1 , as soon as the receiver receives the message, message will be flushed out from queue. If still problem exists the Thread.Sleep in the catch can solve the problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631][streaming] Wait for a receiver to...
Github user Bilna commented on a diff in the pull request: https://github.com/apache/spark/pull/4270#discussion_r23828369 --- Diff: external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala --- @@ -103,7 +97,15 @@ class MQTTReceiver( } } -// Set up callback for MqttClient +// Set up callback for MqttClient. This needs to happen before +// connecting or subscribing, otherwise messages may be lost client.setCallback(callback) --- End diff -- I agree with this modification. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user Bilna commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-69694551 ok.. I will look into it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Minor] Fix import order and other coding styl...
GitHub user Bilna opened a pull request: https://github.com/apache/spark/pull/3966 [Minor] Fix import order and other coding style You can merge this pull request into a Git repository by running: $ git pull https://github.com/Bilna/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3966.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3966 commit 86164950acfc794c6c9b1db3663716ac4626c55b Author: bilna Date: 2014-12-30T13:06:09Z [SPARK-4631] unit test for MQTT commit 5ca66918455f04d748d41b98b9967218a1ab4d00 Author: Bilna P Date: 2014-12-30T13:24:33Z Update MQTTStreamSuite.scala commit e8b6623e5bd31fcb583fdeae5f1c954be672403d Author: Bilna P Date: 2014-12-30T14:42:19Z Update MQTTStreamSuite.scala commit 5f6bfd2f4b11c08e76d02ccf5a5594151ccd9af5 Author: bilna Date: 2014-12-31T06:37:24Z Added BeforeAndAfter commit b1ac4ad62ff6d537f669699d5da49bc4ee1ab154 Author: bilna Date: 2014-12-31T06:54:41Z Added BeforeAndAfter commit 4b580943de5137e947d1a6cdadd054020932ed8e Author: Bilna P Date: 2014-12-31T07:03:44Z Update MQTTStreamSuite.scala commit fc8eb286db6aa8e78a567537996011f554eed969 Author: bilna Date: 2014-12-31T09:42:51Z Merge remote-tracking branch 'upstream/master' commit 89d804e420b05d56a9cd476a58822446662c33b8 Author: bilna Date: 2015-01-02T10:49:58Z Merge remote-tracking branch 'upstream/master' commit 04503cfa7f8168038c17198b6e45b16b89591e74 Author: bilna Date: 2015-01-02T11:40:42Z Added embedded broker service for mqtt test commit 4b34ee784e7c9c489cf0c22d73311c160bc67c47 Author: Bilna P Date: 2015-01-02T11:45:43Z Update MQTTStreamSuite.scala commit ed9db4c0bc0eabc32c4a83ee2953238e382a8632 Author: bilna Date: 2015-01-03T05:00:39Z Merge remote-tracking branch 'upstream/master' commit fac3904a8e702722acca2a0e7217c5440ecda84a Author: bilna Date: 2015-01-03T08:16:27Z Correction in Indentation and coding style commit 28681fa2383c864924e0e1bde3ac47a5ca3a9840 Author: bilna Date: 2015-01-04T06:57:05Z Merge remote-tracking branch 'upstream/master' commit acea3a31eba9d0853cb7484a16f8916219057be0 Author: bilna Date: 2015-01-04T18:22:28Z Adding dependency with scope test commit ae56514639915147dd9f25c7e5d37adbfdace929 Author: bilna Date: 2015-01-08T15:39:39Z Merge remote-tracking branch 'upstream/master' commit 5718d669413d23a8e26658efd447cf323a73c702 Author: bilna Date: 2015-01-09T05:00:04Z Merge remote-tracking branch 'upstream/master' commit 5e76f04f978124bc49a335c49cb252cb787b2706 Author: bilna Date: 2015-01-09T05:28:17Z fix import order and other coding style --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user Bilna commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22507984 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,111 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite +import java.net.{URI, ServerSocket} -import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.activemq.broker.{TransportConnector, BrokerService} +import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ +import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Milliseconds(500) private val master: String = "local[2]" - private val framework: String = this.getClass.getSimpleName + private val freePort = findFreePort() + private val brokerUri = "//localhost:" + freePort + private val topic = "def" + private var ssc: StreamingContext = _ + private val persistenceDir = Utils.createTempDir() + private var broker: BrokerService = _ + private var connector: TransportConnector = _ - test("mqtt input stream") { -val ssc = new StreamingContext(master, framework, batchDuration) -val brokerUrl = "abc" -val topic = "def" + before { +ssc = new StreamingContext(master, framework, batchDuration) +setupMQTT() + } -// tests the API, does not actually test data receiving -val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic) -val test2: ReceiverInputDStream[String] = - MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + after { +if (ssc != null) { + ssc.stop() + ssc = null +} +Utils.deleteRecursively(persistenceDir) +tearDownMQTT() + } -// TODO: Actually test receiving data + test("mqtt input stream") { +val sendMessage = "MQTT demo for spark streaming" +val receiveStream: ReceiverInputDStream[String] = + MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY) +var receiveMessage: List[String] = List() +receiveStream.foreachRDD { rdd => + if (rdd.collect.length > 0) { +receiveMessage = receiveMessage ::: List(rdd.first) +receiveMessage + } +} +ssc.start() +publishData(sendMessage) +eventually(timeout(1 milliseconds), interval(100 milliseconds)) { + assert(sendMessage.equals(receiveMessage(0))) +} ssc.stop() } + + private def setupMQTT() { +broker = new BrokerService() +connector = new TransportConnector() +connector.setName("mqtt") +connector.setUri(new URI("mqtt:" + brokerUri)) +broker.addConnector(connector) +broker.start() + } + + private def tearDownMQTT() { +if (broker != null) { + broker.stop() + broker = null +} +if (connector != null) { + connector.stop() + connector = null +} + } + + private def findFreePort(): Int = { +Utils.startServiceOnPort(23456, (trialPort: Int) => { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) +})._2 + } + + def publishData(data: String): Unit = { +var client: MqttClient = null +try { + val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) + client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence) + client.connect() + if (client.isConnected) { +val msgTopic: MqttTopic = client.getTopic(topic) +val message: MqttMessage = new MqttMessage(data.getBytes("utf-8")) +message.setQos(1) +message.setRetained(true) +for (i <- 0 to 100) + msgTopic.publish(message) --- End diff -- ok.. thanks. --- If your project is set up for it, you can reply to this email and ha
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user Bilna commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22453522 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,111 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite +import java.net.{URI, ServerSocket} -import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.activemq.broker.{TransportConnector, BrokerService} +import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ +import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Milliseconds(500) private val master: String = "local[2]" - private val framework: String = this.getClass.getSimpleName + private val freePort = findFreePort() + private val brokerUri = "//localhost:" + freePort + private val topic = "def" + private var ssc: StreamingContext = _ + private val persistenceDir = Utils.createTempDir() + private var broker: BrokerService = _ + private var connector: TransportConnector = _ - test("mqtt input stream") { -val ssc = new StreamingContext(master, framework, batchDuration) -val brokerUrl = "abc" -val topic = "def" + before { +ssc = new StreamingContext(master, framework, batchDuration) +setupMQTT() + } -// tests the API, does not actually test data receiving -val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic) -val test2: ReceiverInputDStream[String] = - MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + after { +if (ssc != null) { + ssc.stop() + ssc = null +} +Utils.deleteRecursively(persistenceDir) +tearDownMQTT() + } -// TODO: Actually test receiving data + test("mqtt input stream") { +val sendMessage = "MQTT demo for spark streaming" +val receiveStream: ReceiverInputDStream[String] = + MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY) +var receiveMessage: List[String] = List() +receiveStream.foreachRDD { rdd => + if (rdd.collect.length > 0) { +receiveMessage = receiveMessage ::: List(rdd.first) +receiveMessage + } +} +ssc.start() +publishData(sendMessage) +eventually(timeout(1 milliseconds), interval(100 milliseconds)) { + assert(sendMessage.equals(receiveMessage(0))) +} ssc.stop() } + + private def setupMQTT() { +broker = new BrokerService() +connector = new TransportConnector() +connector.setName("mqtt") +connector.setUri(new URI("mqtt:" + brokerUri)) +broker.addConnector(connector) +broker.start() + } + + private def tearDownMQTT() { +if (broker != null) { + broker.stop() + broker = null +} +if (connector != null) { + connector.stop() + connector = null +} + } + + private def findFreePort(): Int = { +Utils.startServiceOnPort(23456, (trialPort: Int) => { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) +})._2 + } + + def publishData(data: String): Unit = { +var client: MqttClient = null +try { + val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) + client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence) + client.connect() + if (client.isConnected) { +val msgTopic: MqttTopic = client.getTopic(topic) +val message: MqttMessage = new MqttMessage(data.getBytes("utf-8")) +message.setQos(1) +message.setRetained(true) +for (i <- 0 to 100) + msgTopic.publish(message) --- End diff -- Can you explain what is the correction here. Just to understand what went wrong.
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user Bilna commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68686800 @tdas, Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user Bilna commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22405599 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,65 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite - +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Seconds(1) private val master: String = "local[2]" - private val framework: String = this.getClass.getSimpleName + private val brokerUrl = "tcp://localhost:1883" --- End diff -- TCP/IP port 1883 is reserved with IANA for use with MQTT. That is why I hardcoded it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
GitHub user Bilna opened a pull request: https://github.com/apache/spark/pull/3844 [SPARK-4631] unit test for MQTT You can merge this pull request into a Git repository by running: $ git pull https://github.com/Bilna/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3844.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3844 commit 86164950acfc794c6c9b1db3663716ac4626c55b Author: bilna Date: 2014-12-30T13:06:09Z [SPARK-4631] unit test for MQTT --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org