[
https://issues.apache.org/jira/browse/BAHIR-89?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15894010#comment-15894010
]
ASF GitHub Bot commented on BAHIR-89:
-------------------------------------
Github user ckadner commented on a diff in the pull request:
https://github.com/apache/bahir/pull/37#discussion_r104118613
--- Diff:
streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
---
@@ -199,7 +199,181 @@ object MQTTUtils {
createStream(jssc.ssc, brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2, Option(clientId),
Option(username), Option(password), Option(cleanSession), None,
None, None, None)
}
+ /**
+ * Create an input stream that receives messages pushed by a MQTT
publisher.
+ * @param ssc StreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level. Defaults to
StorageLevel.MEMORY_AND_DISK_SER_2.
+ */
+ def createPairedStream(
+ ssc: StreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[(String, String)] = {
+ new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT
publisher.
+ * @param ssc StreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level. Defaults to
StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt
publisher
+ * @param password Password for authentication to the mqtt
publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic
subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
+ def createPairedStream(
+ ssc: StreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel,
+ clientId: Option[String],
+ username: Option[String],
+ password: Option[String],
+ cleanSession: Option[Boolean],
+ qos: Option[Int],
+ connectionTimeout: Option[Int],
+ keepAliveInterval: Option[Int],
+ mqttVersion: Option[Int]
+ ): ReceiverInputDStream[(String, String)] = {
+ new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel,
clientId, username, password,
+ cleanSession, qos, connectionTimeout, keepAliveInterval,
mqttVersion)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT
publisher.
+ * Storage level of the data will be the default
StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic Array of topic names to subscribe to
--- End diff --
should be `@param topics` (plural)
> New API for subscribing from a list of MQTT topics and Return tuple of
> <Topic,Message> as output
> ------------------------------------------------------------------------------------------------
>
> Key: BAHIR-89
> URL: https://issues.apache.org/jira/browse/BAHIR-89
> Project: Bahir
> Issue Type: New Feature
> Components: Spark Streaming Connectors
> Affects Versions: Not Applicable
> Environment: Spark Streaming MQTT Connector
> Reporter: Anntinu Josy
> Labels: MQTT, SPARK
> Fix For: Not Applicable
>
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> I am working in IoT Project. As part of MQTT-Kafka bridge program development
> I used Bahir. I feel that it will be a good feature to prove a new API to
> support a list of MQTT topic as input and output as a tuple of <Topic,
> Message>. This will be useful to reduce resource usage in case of multiple
> topic subscription. I had developed this feature a like to integrate
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)