[
https://issues.apache.org/jira/browse/BAHIR-66?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703987#comment-16703987
]
ASF GitHub Bot commented on BAHIR-66:
-------------------------------------
Github user lukasz-antoniak commented on a diff in the pull request:
https://github.com/apache/bahir/pull/71#discussion_r237691415
--- Diff:
streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
---
@@ -15,105 +15,123 @@
* limitations under the License.
*/
-// scalastyle:off println awaitresult
package org.apache.spark.examples.streaming.zeromq
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.language.implicitConversions
+import scala.util.Random
-import akka.actor.ActorSystem
-import akka.actor.actorRef2Scala
-import akka.util.ByteString
-import akka.zeromq._
-import akka.zeromq.Subscribe
import org.apache.log4j.{Level, Logger}
+import org.zeromq.ZContext
+import org.zeromq.ZMQ
+import org.zeromq.ZMQException
+import org.zeromq.ZMsg
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.zeromq._
+import org.apache.spark.streaming.zeromq.ZeroMQUtils
/**
- * A simple publisher for demonstration purposes, repeatedly publishes
random Messages
- * every one second.
+ * Simple publisher for demonstration purposes,
+ * repeatedly publishes random messages every one second.
*/
object SimpleZeroMQPublisher {
-
def main(args: Array[String]): Unit = {
if (args.length < 2) {
- System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic>
")
+ // scalastyle:off println
+ System.err.println("Usage: SimpleZeroMQPublisher <zeroMqUrl>
<topic>")
+ // scalastyle:on println
System.exit(1)
}
val Seq(url, topic) = args.toSeq
- val acs: ActorSystem = ActorSystem()
-
- val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub,
Bind(url))
- implicit def stringToByteString(x: String): ByteString = ByteString(x)
- val messages: List[ByteString] = List("words ", "may ", "count ")
- while (true) {
- Thread.sleep(1000)
- pubSocket ! ZMQMessage(ByteString(topic) :: messages)
- }
- Await.result(acs.whenTerminated, Duration.Inf)
+ val context = new ZContext
+ val socket = context.createSocket(ZMQ.PUB)
+ socket.bind(url)
+
+ val zmqThread = new Thread(new Runnable {
+ def run() {
+ val messages = List("words", "may", "count infinitely")
+ val random = new Random
+ while (!Thread.currentThread.isInterrupted) {
+ try {
+ Thread.sleep(random.nextInt(1000))
+ val msg1 = new ZMsg
+ msg1.add(topic.getBytes)
+ msg1.add(messages(random.nextInt(messages.size)).getBytes)
+ msg1.send(socket)
+ } catch {
+ case e: ZMQException if ZMQ.Error.ETERM.getCode ==
e.getErrorCode =>
+ Thread.currentThread.interrupt()
+ case e: InterruptedException =>
+ case e: Throwable => throw e
+ }
+ }
+ }
+ })
+
+ sys.addShutdownHook( {
+ context.destroy()
+ zmqThread.interrupt()
+ zmqThread.join()
+ } )
+
+ zmqThread.start()
}
}
-// scalastyle:off
/**
- * A sample wordcount with ZeroMQStream stream
- *
- * To work with zeroMQ, some native libraries have to be installed.
- * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide]
- * (http://www.zeromq.org/intro:get-the-software)
+ * Sample word count with ZeroMQ stream.
*
- * Usage: ZeroMQWordCount <zeroMQurl> <topic>
- * <zeroMQurl> and <topic> describe where zeroMq publisher is running.
+ * Usage: ZeroMQWordCount <zeroMqUrl> <topic>
+ * <zeroMqUrl> describes where ZeroMQ publisher is running
+ * <topic> defines logical message type
*
- * To run this example locally, you may run publisher as
+ * To run this example locally, you may start publisher as:
* `$ bin/run-example \
* org.apache.spark.examples.streaming.zeromq.SimpleZeroMQPublisher
tcp://127.0.0.1:1234 foo`
- * and run the example as
+ * and run the example as:
* `$ bin/run-example \
* org.apache.spark.examples.streaming.zeromq.ZeroMQWordCount
tcp://127.0.0.1:1234 foo`
*/
-// scalastyle:on
object ZeroMQWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
- System.err.println("Usage: ZeroMQWordCount <zeroMQurl> <topic>")
+ // scalastyle:off println
+ System.err.println("Usage: ZeroMQWordCount <zeroMqUrl> <topic>")
+ // scalastyle:on println
System.exit(1)
}
- // Set logging level if log4j not configured (override by adding
log4j.properties to classpath)
- if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) {
- Logger.getRootLogger.setLevel(Level.WARN)
- }
+ // Set logging level if log4j not configured (override by adding
log4j.properties to classpath).
+ Logger.getRootLogger.setLevel(Level.WARN)
val Seq(url, topic) = args.toSeq
val sparkConf = new SparkConf().setAppName("ZeroMQWordCount")
- // check Spark configuration for master URL, set it to local if not
configured
+ // Check Spark configuration for master URL, set it to local if not
present.
if (!sparkConf.contains("spark.master")) {
sparkConf.setMaster("local[2]")
}
- // Create the context and set the batch size
+ // Create the context and set the batch size.
val ssc = new StreamingContext(sparkConf, Seconds(2))
- def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] =
x.map(_.utf8String).iterator
+ def bytesToString(bytes: Array[Array[Byte]]) = {
+ Seq(new String(bytes(1), zmq.ZMQ.CHARSET))
--- End diff --
Added test message converter supporting most common scenario.
> Add test that ZeroMQ streaming connector can receive data
> ---------------------------------------------------------
>
> Key: BAHIR-66
> URL: https://issues.apache.org/jira/browse/BAHIR-66
> Project: Bahir
> Issue Type: Sub-task
> Components: Spark Streaming Connectors
> Reporter: Christian Kadner
> Priority: Major
> Labels: test
>
> Add test cases that verify that the *ZeroMQ streaming connector* can receive
> streaming data.
> See [BAHIR-63|https://issues.apache.org/jira/browse/BAHIR-63]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)