Repository: spark
Updated Branches:
  refs/heads/master 678b96e77 -> b0f5497e9


http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 0379957..28aed0c 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -65,14 +65,12 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
     // Try to get the previous state RDD
     getOrCompute(validTime - slideDuration) match {
 
-      case Some(prevStateRDD) => {    // If previous state RDD exists
-
+      case Some(prevStateRDD) =>    // If previous state RDD exists
         // Try to get the parent RDD
         parent.getOrCompute(validTime) match {
-          case Some(parentRDD) => {   // If parent RDD exists, then compute as 
usual
+          case Some(parentRDD) =>   // If parent RDD exists, then compute as 
usual
             computeUsingPreviousRDD(parentRDD, prevStateRDD)
-          }
-          case None => {    // If parent RDD does not exist
+          case None =>    // If parent RDD does not exist
 
             // Re-apply the update function to the old state RDD
             val updateFuncLocal = updateFunc
@@ -82,17 +80,14 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
             }
             val stateRDD = prevStateRDD.mapPartitions(finalFunc, 
preservePartitioning)
             Some(stateRDD)
-          }
         }
-      }
-
-      case None => {    // If previous session RDD does not exist (first input 
data)
 
+      case None =>    // If previous session RDD does not exist (first input 
data)
         // Try to get the parent RDD
         parent.getOrCompute(validTime) match {
-          case Some(parentRDD) => {   // If parent RDD exists, then compute as 
usual
+          case Some(parentRDD) =>   // If parent RDD exists, then compute as 
usual
             initialRDD match {
-              case None => {
+              case None =>
                 // Define the function for the mapPartition operation on 
grouped RDD;
                 // first map the grouped tuple to tuples of required type,
                 // and then apply the update function
@@ -105,18 +100,13 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
                 val sessionRDD = groupedRDD.mapPartitions(finalFunc, 
preservePartitioning)
                 // logDebug("Generating state RDD for time " + validTime + " 
(first)")
                 Some(sessionRDD)
-              }
-              case Some(initialStateRDD) => {
+              case Some(initialStateRDD) =>
                 computeUsingPreviousRDD(parentRDD, initialStateRDD)
-              }
             }
-          }
-          case None => { // If parent RDD does not exist, then nothing to do!
+          case None => // If parent RDD does not exist, then nothing to do!
             // logDebug("Not generating state RDD (no previous state, no 
parent)")
             None
-          }
         }
-      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index bd60059..cfcbdc7 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -538,10 +538,9 @@ class BasicOperationsSuite extends TestSuiteBase {
         val stateObj = state.getOrElse(new StateObject)
         values.sum match {
           case 0 => stateObj.expireCounter += 1 // no new values
-          case n => { // has new values, increment and reset expireCounter
+          case n => // has new values, increment and reset expireCounter
             stateObj.counter += n
             stateObj.expireCounter = 0
-          }
         }
         stateObj.expireCounter match {
           case 2 => None // seen twice with no new values, give it the boot

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index fbb25d4..bdbac64 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -267,10 +267,9 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
     assert(!stateStream.checkpointData.currentCheckpointFiles.isEmpty,
       "No checkpointed RDDs in state stream before first failure")
     stateStream.checkpointData.currentCheckpointFiles.foreach {
-      case (time, file) => {
+      case (time, file) =>
         assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for 
time " + time +
             " for state stream before first failure does not exist")
-      }
     }
 
     // Run till a further time such that previous checkpoint files in the 
stream would be deleted
@@ -297,10 +296,9 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
     assert(!stateStream.checkpointData.currentCheckpointFiles.isEmpty,
       "No checkpointed RDDs in state stream before second failure")
     stateStream.checkpointData.currentCheckpointFiles.foreach {
-      case (time, file) => {
+      case (time, file) =>
         assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for 
time " + time +
           " for state stream before seconds failure does not exist")
-      }
     }
     ssc.stop()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 29bee4a..60c8e70 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -382,11 +382,10 @@ class FileGeneratingThread(input: Seq[String], testDir: 
Path, interval: Long)
                 fs.rename(tempHadoopFile, hadoopFile)
             done = true
           } catch {
-            case ioe: IOException => {
+            case ioe: IOException =>
                   fs = testDir.getFileSystem(new Configuration())
                   logWarning("Attempt " + tries + " at generating file " + 
hadoopFile + " failed.",
                     ioe)
-            }
           }
         }
         if (!done) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 9e84534..d447a59 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -374,7 +374,7 @@ private[spark] class ApplicationMaster(
             failureCount = 0
           } catch {
             case i: InterruptedException =>
-            case e: Throwable => {
+            case e: Throwable =>
               failureCount += 1
               // this exception was introduced in hadoop 2.4 and this code 
would not compile
               // with earlier versions if we refer it directly.
@@ -390,7 +390,6 @@ private[spark] class ApplicationMaster(
               } else {
                 logWarning(s"Reporter thread fails $failureCount time(s) in a 
row.", e)
               }
-            }
           }
           try {
             val numPendingAllocate = allocator.getPendingAllocate.size

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index b0bfe85..23742ea 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -148,11 +148,10 @@ private[yarn] class YarnAllocator(
         classOf[Array[String]], classOf[Array[String]], classOf[Priority], 
classOf[Boolean],
         classOf[String]))
     } catch {
-      case e: NoSuchMethodException => {
+      case e: NoSuchMethodException =>
         logWarning(s"Node label expression $expr will be ignored because YARN 
version on" +
           " classpath does not support it.")
         None
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 8720ee5..6b3c831 100644
--- 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -223,17 +223,15 @@ private[spark] abstract class YarnSchedulerBackend(
           val lossReasonRequest = GetExecutorLossReason(executorId)
           val future = am.ask[ExecutorLossReason](lossReasonRequest, 
askTimeout)
           future onSuccess {
-            case reason: ExecutorLossReason => {
+            case reason: ExecutorLossReason =>
               driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, 
reason))
-            }
           }
           future onFailure {
-            case NonFatal(e) => {
+            case NonFatal(e) =>
               logWarning(s"Attempted to get executor loss reason" +
                 s" for executor id ${executorId} at RPC address 
${executorRpcAddress}," +
                 s" but got no response. Marking as slave lost.", e)
               driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, 
SlaveLost()))
-            }
             case t => throw t
           }
         case None =>

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index de14e36..fe09808 100644
--- 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -101,22 +101,18 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with 
Matchers with Logging
     val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
 
     viewAcls match {
-      case Some(vacls) => {
+      case Some(vacls) =>
         val aclSet = vacls.split(',').map(_.trim).toSet
         assert(aclSet.contains(System.getProperty("user.name", "invalid")))
-      }
-      case None => {
+      case None =>
         fail()
-      }
     }
     modifyAcls match {
-      case Some(macls) => {
+      case Some(macls) =>
         val aclSet = macls.split(',').map(_.trim).toSet
         assert(aclSet.contains(System.getProperty("user.name", "invalid")))
-      }
-      case None => {
+      case None =>
         fail()
-      }
     }
   }
 
@@ -135,26 +131,22 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with 
Matchers with Logging
     val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
 
     viewAcls match {
-      case Some(vacls) => {
+      case Some(vacls) =>
         val aclSet = vacls.split(',').map(_.trim).toSet
         assert(aclSet.contains("user1"))
         assert(aclSet.contains("user2"))
         assert(aclSet.contains(System.getProperty("user.name", "invalid")))
-      }
-      case None => {
+      case None =>
         fail()
-      }
     }
     modifyAcls match {
-      case Some(macls) => {
+      case Some(macls) =>
         val aclSet = macls.split(',').map(_.trim).toSet
         assert(aclSet.contains("user3"))
         assert(aclSet.contains("user4"))
         assert(aclSet.contains(System.getProperty("user.name", "invalid")))
-      }
-      case None => {
+      case None =>
         fail()
-      }
     }
 
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to