[GitHub] spark pull request: [SPARK-4631][streaming] Wait for a receiver to...

2015-01-30 Thread Bilna
Github user Bilna commented on a diff in the pull request:

https://github.com/apache/spark/pull/4270#discussion_r23831606
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -121,8 +130,18 @@ class MQTTStreamSuite extends FunSuite with Eventually 
with BeforeAndAfter {
 val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
 message.setQos(1)
 message.setRetained(true)
+
+def tryPublish(times: Int): Unit =
+  if (times > 0) try {
+msgTopic.publish(message)
+  } catch {
+case e: MqttException if e.getReasonCode == 
MqttException.REASON_CODE_MAX_INFLIGHT =>
+  Thread.sleep(50) // wait for Spark streaming to consume 
something from the message queue
+  tryPublish(times - 1)
+  }
+
 for (i <- 0 to 100) {
-  msgTopic.publish(message)
+  tryPublish(3)
--- End diff --

Please try this also.
Along with waitForReceiverToStart(), change the 'for loop' range from 0 to 
9 . It is just for testing. 
I am getting this error only if I change the MqttDefaultFilePersistence to 
memoryPersistence. I solved the problem by applying the proposed solution above.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631][streaming] Wait for a receiver to...

2015-01-30 Thread Bilna
Github user Bilna commented on a diff in the pull request:

https://github.com/apache/spark/pull/4270#discussion_r23828884
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -121,8 +130,18 @@ class MQTTStreamSuite extends FunSuite with Eventually 
with BeforeAndAfter {
 val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
 message.setQos(1)
 message.setRetained(true)
+
+def tryPublish(times: Int): Unit =
+  if (times > 0) try {
+msgTopic.publish(message)
+  } catch {
+case e: MqttException if e.getReasonCode == 
MqttException.REASON_CODE_MAX_INFLIGHT =>
+  Thread.sleep(50) // wait for Spark streaming to consume 
something from the message queue
+  tryPublish(times - 1)
+  }
+
 for (i <- 0 to 100) {
-  msgTopic.publish(message)
+  tryPublish(3)
--- End diff --

I think waitForReceiverToStart() method is sufficient. Once the receiver is 
started, since the qos is set to 1 , as soon as the receiver receives the 
message, message will be flushed out from queue. If still problem exists the 
Thread.Sleep in the catch can solve the problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631][streaming] Wait for a receiver to...

2015-01-30 Thread Bilna
Github user Bilna commented on a diff in the pull request:

https://github.com/apache/spark/pull/4270#discussion_r23828369
  
--- Diff: 
external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
 ---
@@ -103,7 +97,15 @@ class MQTTReceiver(
   }
 }
 
-// Set up callback for MqttClient
+// Set up callback for MqttClient. This needs to happen before
+// connecting or subscribing, otherwise messages may be lost
 client.setCallback(callback)
--- End diff --

I agree with this modification.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-12 Thread Bilna
Github user Bilna commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-69694551
  
ok.. I will look into it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Minor] Fix import order and other coding styl...

2015-01-08 Thread Bilna
GitHub user Bilna opened a pull request:

https://github.com/apache/spark/pull/3966

[Minor] Fix import order and other coding style



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Bilna/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3966.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3966


commit 86164950acfc794c6c9b1db3663716ac4626c55b
Author: bilna 
Date:   2014-12-30T13:06:09Z

[SPARK-4631] unit test for MQTT

commit 5ca66918455f04d748d41b98b9967218a1ab4d00
Author: Bilna P 
Date:   2014-12-30T13:24:33Z

Update MQTTStreamSuite.scala

commit e8b6623e5bd31fcb583fdeae5f1c954be672403d
Author: Bilna P 
Date:   2014-12-30T14:42:19Z

Update MQTTStreamSuite.scala

commit 5f6bfd2f4b11c08e76d02ccf5a5594151ccd9af5
Author: bilna 
Date:   2014-12-31T06:37:24Z

Added BeforeAndAfter

commit b1ac4ad62ff6d537f669699d5da49bc4ee1ab154
Author: bilna 
Date:   2014-12-31T06:54:41Z

Added BeforeAndAfter

commit 4b580943de5137e947d1a6cdadd054020932ed8e
Author: Bilna P 
Date:   2014-12-31T07:03:44Z

Update MQTTStreamSuite.scala

commit fc8eb286db6aa8e78a567537996011f554eed969
Author: bilna 
Date:   2014-12-31T09:42:51Z

Merge remote-tracking branch 'upstream/master'

commit 89d804e420b05d56a9cd476a58822446662c33b8
Author: bilna 
Date:   2015-01-02T10:49:58Z

Merge remote-tracking branch 'upstream/master'

commit 04503cfa7f8168038c17198b6e45b16b89591e74
Author: bilna 
Date:   2015-01-02T11:40:42Z

Added embedded broker service for mqtt test

commit 4b34ee784e7c9c489cf0c22d73311c160bc67c47
Author: Bilna P 
Date:   2015-01-02T11:45:43Z

Update MQTTStreamSuite.scala

commit ed9db4c0bc0eabc32c4a83ee2953238e382a8632
Author: bilna 
Date:   2015-01-03T05:00:39Z

Merge remote-tracking branch 'upstream/master'

commit fac3904a8e702722acca2a0e7217c5440ecda84a
Author: bilna 
Date:   2015-01-03T08:16:27Z

Correction in Indentation and coding style

commit 28681fa2383c864924e0e1bde3ac47a5ca3a9840
Author: bilna 
Date:   2015-01-04T06:57:05Z

Merge remote-tracking branch 'upstream/master'

commit acea3a31eba9d0853cb7484a16f8916219057be0
Author: bilna 
Date:   2015-01-04T18:22:28Z

Adding dependency with scope test

commit ae56514639915147dd9f25c7e5d37adbfdace929
Author: bilna 
Date:   2015-01-08T15:39:39Z

Merge remote-tracking branch 'upstream/master'

commit 5718d669413d23a8e26658efd447cf323a73c702
Author: bilna 
Date:   2015-01-09T05:00:04Z

Merge remote-tracking branch 'upstream/master'

commit 5e76f04f978124bc49a335c49cb252cb787b2706
Author: bilna 
Date:   2015-01-09T05:28:17Z

fix import order and other coding style




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-05 Thread Bilna
Github user Bilna commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22507984
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,111 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
+import java.net.{URI, ServerSocket}
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter 
{
 
+  private val batchDuration = Milliseconds(500)
   private val master: String = "local[2]"
-
   private val framework: String = this.getClass.getSimpleName
+  private val freePort = findFreePort()
+  private val brokerUri = "//localhost:" + freePort
+  private val topic = "def"
+  private var ssc: StreamingContext = _
+  private val persistenceDir = Utils.createTempDir()
+  private var broker: BrokerService = _
+  private var connector: TransportConnector = _
 
-  test("mqtt input stream") {
-val ssc = new StreamingContext(master, framework, batchDuration)
-val brokerUrl = "abc"
-val topic = "def"
+  before {
+ssc = new StreamingContext(master, framework, batchDuration)
+setupMQTT()
+  }
 
-// tests the API, does not actually test data receiving
-val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, 
brokerUrl, topic)
-val test2: ReceiverInputDStream[String] =
-  MQTTUtils.createStream(ssc, brokerUrl, topic, 
StorageLevel.MEMORY_AND_DISK_SER_2)
+  after {
+if (ssc != null) {
+  ssc.stop()
+  ssc = null
+}
+Utils.deleteRecursively(persistenceDir)
+tearDownMQTT()
+  }
 
-// TODO: Actually test receiving data
+  test("mqtt input stream") {
+val sendMessage = "MQTT demo for spark streaming"
+val receiveStream: ReceiverInputDStream[String] =
+  MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, 
StorageLevel.MEMORY_ONLY)
+var receiveMessage: List[String] = List()
+receiveStream.foreachRDD { rdd =>
+  if (rdd.collect.length > 0) {
+receiveMessage = receiveMessage ::: List(rdd.first)
+receiveMessage
+  }
+}
+ssc.start()
+publishData(sendMessage)
+eventually(timeout(1 milliseconds), interval(100 milliseconds)) {
+  assert(sendMessage.equals(receiveMessage(0)))
+}
 ssc.stop()
   }
+
+  private def setupMQTT() {
+broker = new BrokerService()
+connector = new TransportConnector()
+connector.setName("mqtt")
+connector.setUri(new URI("mqtt:" + brokerUri))
+broker.addConnector(connector)
+broker.start()
+  }
+
+  private def tearDownMQTT() {
+if (broker != null) {
+  broker.stop()
+  broker = null
+}
+if (connector != null) {
+  connector.stop()
+  connector = null
+}
+  }
+
+  private def findFreePort(): Int = {
+Utils.startServiceOnPort(23456, (trialPort: Int) => {
+  val socket = new ServerSocket(trialPort)
+  socket.close()
+  (null, trialPort)
+})._2
+  }
+
+  def publishData(data: String): Unit = {
+var client: MqttClient = null
+try {
+  val persistence: MqttClientPersistence = new 
MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+  client = new MqttClient("tcp:" + brokerUri, 
MqttClient.generateClientId(), persistence)
+  client.connect()
+  if (client.isConnected) {
+val msgTopic: MqttTopic = client.getTopic(topic)
+val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
+message.setQos(1)
+message.setRetained(true)
+for (i <- 0 to 100)
+  msgTopic.publish(message)
--- End diff --

ok.. thanks.


---
If your project is set up for it, you can reply to this email and ha

[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-05 Thread Bilna
Github user Bilna commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22453522
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,111 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
+import java.net.{URI, ServerSocket}
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter 
{
 
+  private val batchDuration = Milliseconds(500)
   private val master: String = "local[2]"
-
   private val framework: String = this.getClass.getSimpleName
+  private val freePort = findFreePort()
+  private val brokerUri = "//localhost:" + freePort
+  private val topic = "def"
+  private var ssc: StreamingContext = _
+  private val persistenceDir = Utils.createTempDir()
+  private var broker: BrokerService = _
+  private var connector: TransportConnector = _
 
-  test("mqtt input stream") {
-val ssc = new StreamingContext(master, framework, batchDuration)
-val brokerUrl = "abc"
-val topic = "def"
+  before {
+ssc = new StreamingContext(master, framework, batchDuration)
+setupMQTT()
+  }
 
-// tests the API, does not actually test data receiving
-val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, 
brokerUrl, topic)
-val test2: ReceiverInputDStream[String] =
-  MQTTUtils.createStream(ssc, brokerUrl, topic, 
StorageLevel.MEMORY_AND_DISK_SER_2)
+  after {
+if (ssc != null) {
+  ssc.stop()
+  ssc = null
+}
+Utils.deleteRecursively(persistenceDir)
+tearDownMQTT()
+  }
 
-// TODO: Actually test receiving data
+  test("mqtt input stream") {
+val sendMessage = "MQTT demo for spark streaming"
+val receiveStream: ReceiverInputDStream[String] =
+  MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, 
StorageLevel.MEMORY_ONLY)
+var receiveMessage: List[String] = List()
+receiveStream.foreachRDD { rdd =>
+  if (rdd.collect.length > 0) {
+receiveMessage = receiveMessage ::: List(rdd.first)
+receiveMessage
+  }
+}
+ssc.start()
+publishData(sendMessage)
+eventually(timeout(1 milliseconds), interval(100 milliseconds)) {
+  assert(sendMessage.equals(receiveMessage(0)))
+}
 ssc.stop()
   }
+
+  private def setupMQTT() {
+broker = new BrokerService()
+connector = new TransportConnector()
+connector.setName("mqtt")
+connector.setUri(new URI("mqtt:" + brokerUri))
+broker.addConnector(connector)
+broker.start()
+  }
+
+  private def tearDownMQTT() {
+if (broker != null) {
+  broker.stop()
+  broker = null
+}
+if (connector != null) {
+  connector.stop()
+  connector = null
+}
+  }
+
+  private def findFreePort(): Int = {
+Utils.startServiceOnPort(23456, (trialPort: Int) => {
+  val socket = new ServerSocket(trialPort)
+  socket.close()
+  (null, trialPort)
+})._2
+  }
+
+  def publishData(data: String): Unit = {
+var client: MqttClient = null
+try {
+  val persistence: MqttClientPersistence = new 
MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+  client = new MqttClient("tcp:" + brokerUri, 
MqttClient.generateClientId(), persistence)
+  client.connect()
+  if (client.isConnected) {
+val msgTopic: MqttTopic = client.getTopic(topic)
+val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
+message.setQos(1)
+message.setRetained(true)
+for (i <- 0 to 100)
+  msgTopic.publish(message)
--- End diff --

Can you explain what is the correction here. Just to understand what went 
wrong.


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-05 Thread Bilna
Github user Bilna commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68686800
  
@tdas, Thanks 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-01 Thread Bilna
Github user Bilna commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22405599
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,65 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
-
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter 
{
 
+  private val batchDuration = Seconds(1)
   private val master: String = "local[2]"
-
   private val framework: String = this.getClass.getSimpleName
+  private val brokerUrl = "tcp://localhost:1883"
--- End diff --

TCP/IP port 1883 is reserved with IANA for use with MQTT. That is why I 
hardcoded it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-30 Thread Bilna
GitHub user Bilna opened a pull request:

https://github.com/apache/spark/pull/3844

[SPARK-4631] unit test for MQTT



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Bilna/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3844.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3844


commit 86164950acfc794c6c9b1db3663716ac4626c55b
Author: bilna 
Date:   2014-12-30T13:06:09Z

[SPARK-4631] unit test for MQTT




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org