Repository: spark Updated Branches: refs/heads/master 678b96e77 -> b0f5497e9
http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index 0379957..28aed0c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -65,14 +65,12 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( // Try to get the previous state RDD getOrCompute(validTime - slideDuration) match { - case Some(prevStateRDD) => { // If previous state RDD exists - + case Some(prevStateRDD) => // If previous state RDD exists // Try to get the parent RDD parent.getOrCompute(validTime) match { - case Some(parentRDD) => { // If parent RDD exists, then compute as usual + case Some(parentRDD) => // If parent RDD exists, then compute as usual computeUsingPreviousRDD(parentRDD, prevStateRDD) - } - case None => { // If parent RDD does not exist + case None => // If parent RDD does not exist // Re-apply the update function to the old state RDD val updateFuncLocal = updateFunc @@ -82,17 +80,14 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( } val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning) Some(stateRDD) - } } - } - - case None => { // If previous session RDD does not exist (first input data) + case None => // If previous session RDD does not exist (first input data) // Try to get the parent RDD parent.getOrCompute(validTime) match { - case Some(parentRDD) => { // If parent RDD exists, then compute as usual + case Some(parentRDD) => // If parent RDD exists, then compute as usual initialRDD match { - case None => { + case None => // Define the function for the mapPartition operation on grouped RDD; // first map the grouped tuple to tuples of required type, // and then apply the update function @@ -105,18 +100,13 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) // logDebug("Generating state RDD for time " + validTime + " (first)") Some(sessionRDD) - } - case Some(initialStateRDD) => { + case Some(initialStateRDD) => computeUsingPreviousRDD(parentRDD, initialStateRDD) - } } - } - case None => { // If parent RDD does not exist, then nothing to do! + case None => // If parent RDD does not exist, then nothing to do! // logDebug("Not generating state RDD (no previous state, no parent)") None - } } - } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index bd60059..cfcbdc7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -538,10 +538,9 @@ class BasicOperationsSuite extends TestSuiteBase { val stateObj = state.getOrElse(new StateObject) values.sum match { case 0 => stateObj.expireCounter += 1 // no new values - case n => { // has new values, increment and reset expireCounter + case n => // has new values, increment and reset expireCounter stateObj.counter += n stateObj.expireCounter = 0 - } } stateObj.expireCounter match { case 2 => None // seen twice with no new values, give it the boot http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index fbb25d4..bdbac64 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -267,10 +267,9 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester assert(!stateStream.checkpointData.currentCheckpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure") stateStream.checkpointData.currentCheckpointFiles.foreach { - case (time, file) => { + case (time, file) => assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist") - } } // Run till a further time such that previous checkpoint files in the stream would be deleted @@ -297,10 +296,9 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester assert(!stateStream.checkpointData.currentCheckpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure") stateStream.checkpointData.currentCheckpointFiles.foreach { - case (time, file) => { + case (time, file) => assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist") - } } ssc.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala index 29bee4a..60c8e70 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala @@ -382,11 +382,10 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) fs.rename(tempHadoopFile, hadoopFile) done = true } catch { - case ioe: IOException => { + case ioe: IOException => fs = testDir.getFileSystem(new Configuration()) logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe) - } } } if (!done) { http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 9e84534..d447a59 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -374,7 +374,7 @@ private[spark] class ApplicationMaster( failureCount = 0 } catch { case i: InterruptedException => - case e: Throwable => { + case e: Throwable => failureCount += 1 // this exception was introduced in hadoop 2.4 and this code would not compile // with earlier versions if we refer it directly. @@ -390,7 +390,6 @@ private[spark] class ApplicationMaster( } else { logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e) } - } } try { val numPendingAllocate = allocator.getPendingAllocate.size http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index b0bfe85..23742ea 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -148,11 +148,10 @@ private[yarn] class YarnAllocator( classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean], classOf[String])) } catch { - case e: NoSuchMethodException => { + case e: NoSuchMethodException => logWarning(s"Node label expression $expr will be ignored because YARN version on" + " classpath does not support it.") None - } } } http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 8720ee5..6b3c831 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -223,17 +223,15 @@ private[spark] abstract class YarnSchedulerBackend( val lossReasonRequest = GetExecutorLossReason(executorId) val future = am.ask[ExecutorLossReason](lossReasonRequest, askTimeout) future onSuccess { - case reason: ExecutorLossReason => { + case reason: ExecutorLossReason => driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason)) - } } future onFailure { - case NonFatal(e) => { + case NonFatal(e) => logWarning(s"Attempted to get executor loss reason" + s" for executor id ${executorId} at RPC address ${executorRpcAddress}," + s" but got no response. Marking as slave lost.", e) driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, SlaveLost())) - } case t => throw t } case None => http://git-wip-us.apache.org/repos/asf/spark/blob/b0f5497e/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index de14e36..fe09808 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -101,22 +101,18 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) viewAcls match { - case Some(vacls) => { + case Some(vacls) => val aclSet = vacls.split(',').map(_.trim).toSet assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { + case None => fail() - } } modifyAcls match { - case Some(macls) => { + case Some(macls) => val aclSet = macls.split(',').map(_.trim).toSet assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { + case None => fail() - } } } @@ -135,26 +131,22 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) viewAcls match { - case Some(vacls) => { + case Some(vacls) => val aclSet = vacls.split(',').map(_.trim).toSet assert(aclSet.contains("user1")) assert(aclSet.contains("user2")) assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { + case None => fail() - } } modifyAcls match { - case Some(macls) => { + case Some(macls) => val aclSet = macls.split(',').map(_.trim).toSet assert(aclSet.contains("user3")) assert(aclSet.contains("user4")) assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { + case None => fail() - } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org