Repository: spark Updated Branches: refs/heads/master 29fabb1b5 -> b4d0db80a
[SPARK-4873][Streaming] Use `Future.zip` instead of `Future.flatMap`(for-loop) in WriteAheadLogBasedBlockHandler Use `Future.zip` instead of `Future.flatMap`(for-loop). `zip` implies these two Futures will run concurrently, while `flatMap` usually means one Future depends on the other one. Author: zsxwing <zsxw...@gmail.com> Closes #3721 from zsxwing/SPARK-4873 and squashes the following commits: 46a2cd9 [zsxwing] Use Future.zip instead of Future.flatMap(for-loop) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4d0db80 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4d0db80 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4d0db80 Branch: refs/heads/master Commit: b4d0db80a0bfba7f1e045d4edb9357b4b2c0a557 Parents: 29fabb1 Author: zsxwing <zsxw...@gmail.com> Authored: Wed Dec 24 19:49:41 2014 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Wed Dec 24 19:49:41 2014 -0800 ---------------------------------------------------------------------- .../apache/spark/streaming/receiver/ReceivedBlockHandler.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b4d0db80/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index c0670e2..8b97db8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -187,10 +187,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( } // Combine the futures, wait for both to complete, and return the write ahead log segment - val combinedFuture = for { - _ <- storeInBlockManagerFuture - fileSegment <- storeInWriteAheadLogFuture - } yield fileSegment + val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) val segment = Await.result(combinedFuture, blockStoreTimeout) WriteAheadLogBasedStoreResult(blockId, segment) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org