Updated Branches: refs/heads/master a8cf3ec15 -> 0386f42e3
Merge pull request #529 from hsaputra/cleanup_right_arrowop_scala Change the â character (maybe from scalariform) to => in Scala code for style consistency Looks like there are some â Unicode character (maybe from scalariform) in Scala code. This PR is to change it to => to get some consistency on the Scala code. If we want to use â as default we could use sbt plugin scalariform to make sure all Scala code has â instead of => And remove unused imports found in TwitterInputDStream.scala while I was there =) Author: Henry Saputra <hsapu...@apache.org> == Merge branch commits == commit 29c1771d346dff901b0b778f764e6b4409900234 Author: Henry Saputra <hsapu...@apache.org> Date: Sat Feb 1 22:05:16 2014 -0800 Change the â character (maybe from scalariform) to => in Scala code for style consistency. Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0386f42e Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0386f42e Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0386f42e Branch: refs/heads/master Commit: 0386f42e383dc01b8df33c4a70b024e7902b5fdd Parents: a8cf3ec Author: Henry Saputra <hsapu...@apache.org> Authored: Sun Feb 2 21:51:17 2014 -0800 Committer: Reynold Xin <r...@apache.org> Committed: Sun Feb 2 21:51:17 2014 -0800 ---------------------------------------------------------------------- .../spark/streaming/examples/ActorWordCount.scala | 2 +- .../streaming/twitter/TwitterInputDStream.scala | 5 +---- .../spark/streaming/zeromq/ZeroMQReceiver.scala | 8 ++++---- .../apache/spark/streaming/zeromq/ZeroMQUtils.scala | 2 +- .../spark/streaming/receivers/ActorReceiver.scala | 16 ++++++++-------- 5 files changed, 15 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0386f42e/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala index a588881..bc0d163 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala @@ -88,7 +88,7 @@ extends Actor with Receiver { override def preStart = remotePublisher ! SubscribeReceiver(context.self) def receive = { - case msg â pushBlock(msg.asInstanceOf[T]) + case msg => pushBlock(msg.asInstanceOf[T]) } override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0386f42e/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 5cc721d..3316b6d 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -17,14 +17,11 @@ package org.apache.spark.streaming.twitter -import java.util.prefs.Preferences import twitter4j._ import twitter4j.auth.Authorization import twitter4j.conf.ConfigurationBuilder -import twitter4j.conf.PropertyConfiguration import twitter4j.auth.OAuthAuthorization -import twitter4j.auth.AccessToken -import org.apache.spark._ + import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.storage.StorageLevel http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0386f42e/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala ---------------------------------------------------------------------- diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala index 769761e..960c6a3 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala @@ -31,7 +31,7 @@ import org.apache.spark.streaming.receivers._ */ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, subscribe: Subscribe, - bytesToObjects: Seq[ByteString] â Iterator[T]) + bytesToObjects: Seq[ByteString] => Iterator[T]) extends Actor with Receiver with Logging { override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self), @@ -39,16 +39,16 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, def receive: Receive = { - case Connecting â logInfo("connecting ...") + case Connecting => logInfo("connecting ...") - case m: ZMQMessage â + case m: ZMQMessage => logDebug("Received message for:" + m.frame(0)) //We ignore first frame for processing as it is the topic val bytes = m.frames.tail pushBlock(bytesToObjects(bytes)) - case Closed â logInfo("received closed ") + case Closed => logInfo("received closed ") } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0386f42e/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala ---------------------------------------------------------------------- diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index 7a14b3d..b47d786 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -46,7 +46,7 @@ object ZeroMQUtils { ssc: StreamingContext, publisherUrl: String, subscribe: Subscribe, - bytesToObjects: Seq[ByteString] â Iterator[T], + bytesToObjects: Seq[ByteString] => Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy ): DStream[T] = { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0386f42e/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala index 79ed696..9c5b177 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala @@ -37,8 +37,8 @@ object ReceiverSupervisorStrategy { val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 15 millis) { - case _: RuntimeException â Restart - case _: Exception â Escalate + case _: RuntimeException => Restart + case _: Exception => Escalate } } @@ -66,7 +66,7 @@ object ReceiverSupervisorStrategy { */ trait Receiver { - self: Actor â // to ensure that this can be added to Actor classes only + self: Actor => // to ensure that this can be added to Actor classes only /** * Push an iterator received data into Spark Streaming for processing @@ -139,25 +139,25 @@ private[streaming] class ActorReceiver[T: ClassTag]( def receive = { - case Data(iter: Iterator[_]) â pushBlock(iter.asInstanceOf[Iterator[T]]) + case Data(iter: Iterator[_]) => pushBlock(iter.asInstanceOf[Iterator[T]]) - case Data(msg) â + case Data(msg) => blocksGenerator += msg.asInstanceOf[T] n.incrementAndGet - case props: Props â + case props: Props => val worker = context.actorOf(props) logInfo("Started receiver worker at:" + worker.path) sender ! worker - case (props: Props, name: String) â + case (props: Props, name: String) => val worker = context.actorOf(props, name) logInfo("Started receiver worker at:" + worker.path) sender ! worker case _: PossiblyHarmful => hiccups.incrementAndGet() - case _: Statistics â + case _: Statistics => val workers = context.children sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n"))