Subscribe
- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
subscribe
subscribe spark
subscribe spark
subscribe to spark commits
subscribe to spark commits
spark git commit: [SPARK-9556] [SPARK-9619] [SPARK-9624] [STREAMING] Make BlockGenerator more robust and make all BlockGenerators subscribe to rate limit updates
Repository: spark Updated Branches: refs/heads/branch-1.5 3137628bc - 3997dd3fd [SPARK-9556] [SPARK-9619] [SPARK-9624] [STREAMING] Make BlockGenerator more robust and make all BlockGenerators subscribe to rate limit updates In some receivers, instead of using the default `BlockGenerator` in `ReceiverSupervisorImpl`, custom generator with their custom listeners are used for reliability (see [`ReliableKafkaReceiver`](https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala#L99) and [updated `KinesisReceiver`](https://github.com/apache/spark/pull/7825/files)). These custom generators do not receive rate updates. This PR modifies the code to allow custom `BlockGenerator`s to be created through the `ReceiverSupervisorImpl` so that they can be kept track and rate updates can be applied. In the process, I did some simplification, and de-flaki-fication of some rate controller related tests. In particular. - Renamed `Receiver.executor` to `Receiver.supervisor` (to match `ReceiverSupervisor`) - Made `RateControllerSuite` faster (by increasing batch interval) and less flaky - Changed a few internal API to return the current rate of block generators as Long instead of Option\[Long\] (was inconsistent at places). - Updated existing `ReceiverTrackerSuite` to test that custom block generators get rate updates as well. Author: Tathagata Das tathagata.das1...@gmail.com Closes #7913 from tdas/SPARK-9556 and squashes the following commits: 41d4461 [Tathagata Das] fix scala style eb9fd59 [Tathagata Das] Updated kinesis receiver d24994d [Tathagata Das] Updated BlockGeneratorSuite to use manual clock in BlockGenerator d70608b [Tathagata Das] Updated BlockGenerator with states and proper synchronization f6bd47e [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-9556 31da173 [Tathagata Das] Fix bug 12116df [Tathagata Das] Add BlockGeneratorSuite 74bd069 [Tathagata Das] Fix style 989bb5c [Tathagata Das] Made BlockGenerator fail is used after stop, and added better unit tests for it 3ff618c [Tathagata Das] Fix test b40eff8 [Tathagata Das] slight refactoring f0df0f1 [Tathagata Das] Scala style fixes 51759cb [Tathagata Das] Refactored rate controller tests and added the ability to update rate of any custom block generator (cherry picked from commit 0a078303d08ad2bb92b9a8a6969563d75b512290) Signed-off-by: Tathagata Das tathagata.das1...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3997dd3f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3997dd3f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3997dd3f Branch: refs/heads/branch-1.5 Commit: 3997dd3fde0f1f67ddc4941921a8ce1449bb44f0 Parents: 3137628 Author: Tathagata Das tathagata.das1...@gmail.com Authored: Thu Aug 6 14:35:30 2015 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Thu Aug 6 14:36:55 2015 -0700 -- .../org/apache/spark/util/ManualClock.scala | 2 +- .../streaming/kafka/ReliableKafkaReceiver.scala | 2 +- .../streaming/kinesis/KinesisReceiver.scala | 2 +- .../streaming/receiver/ActorReceiver.scala | 8 +- .../streaming/receiver/BlockGenerator.scala | 131 +++--- .../spark/streaming/receiver/RateLimiter.scala | 3 +- .../spark/streaming/receiver/Receiver.scala | 52 ++-- .../streaming/receiver/ReceiverSupervisor.scala | 27 +- .../receiver/ReceiverSupervisorImpl.scala | 33 ++- .../spark/streaming/CheckpointSuite.scala | 16 +- .../apache/spark/streaming/ReceiverSuite.scala | 31 +-- .../receiver/BlockGeneratorSuite.scala | 253 +++ .../scheduler/RateControllerSuite.scala | 64 ++--- .../scheduler/ReceiverTrackerSuite.scala| 129 +- 14 files changed, 534 insertions(+), 219 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3997dd3f/core/src/main/scala/org/apache/spark/util/ManualClock.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala index 1718554..e7a65d7 100644 --- a/core/src/main/scala/org/apache/spark/util/ManualClock.scala +++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala @@ -58,7 +58,7 @@ private[spark] class ManualClock(private var time: Long) extends Clock { */ def waitTillTime(targetTime: Long): Long = synchronized { while (time targetTime) { - wait(100) + wait(10) } getTimeMillis() } http://git-wip-us.apache.org/repos/asf/spark/blob/3997dd3f/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
spark git commit: [SPARK-9556] [SPARK-9619] [SPARK-9624] [STREAMING] Make BlockGenerator more robust and make all BlockGenerators subscribe to rate limit updates
Repository: spark Updated Branches: refs/heads/master 21fdfd7d6 - 0a078303d [SPARK-9556] [SPARK-9619] [SPARK-9624] [STREAMING] Make BlockGenerator more robust and make all BlockGenerators subscribe to rate limit updates In some receivers, instead of using the default `BlockGenerator` in `ReceiverSupervisorImpl`, custom generator with their custom listeners are used for reliability (see [`ReliableKafkaReceiver`](https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala#L99) and [updated `KinesisReceiver`](https://github.com/apache/spark/pull/7825/files)). These custom generators do not receive rate updates. This PR modifies the code to allow custom `BlockGenerator`s to be created through the `ReceiverSupervisorImpl` so that they can be kept track and rate updates can be applied. In the process, I did some simplification, and de-flaki-fication of some rate controller related tests. In particular. - Renamed `Receiver.executor` to `Receiver.supervisor` (to match `ReceiverSupervisor`) - Made `RateControllerSuite` faster (by increasing batch interval) and less flaky - Changed a few internal API to return the current rate of block generators as Long instead of Option\[Long\] (was inconsistent at places). - Updated existing `ReceiverTrackerSuite` to test that custom block generators get rate updates as well. Author: Tathagata Das tathagata.das1...@gmail.com Closes #7913 from tdas/SPARK-9556 and squashes the following commits: 41d4461 [Tathagata Das] fix scala style eb9fd59 [Tathagata Das] Updated kinesis receiver d24994d [Tathagata Das] Updated BlockGeneratorSuite to use manual clock in BlockGenerator d70608b [Tathagata Das] Updated BlockGenerator with states and proper synchronization f6bd47e [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-9556 31da173 [Tathagata Das] Fix bug 12116df [Tathagata Das] Add BlockGeneratorSuite 74bd069 [Tathagata Das] Fix style 989bb5c [Tathagata Das] Made BlockGenerator fail is used after stop, and added better unit tests for it 3ff618c [Tathagata Das] Fix test b40eff8 [Tathagata Das] slight refactoring f0df0f1 [Tathagata Das] Scala style fixes 51759cb [Tathagata Das] Refactored rate controller tests and added the ability to update rate of any custom block generator Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a078303 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a078303 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a078303 Branch: refs/heads/master Commit: 0a078303d08ad2bb92b9a8a6969563d75b512290 Parents: 21fdfd7 Author: Tathagata Das tathagata.das1...@gmail.com Authored: Thu Aug 6 14:35:30 2015 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Thu Aug 6 14:35:30 2015 -0700 -- .../org/apache/spark/util/ManualClock.scala | 2 +- .../streaming/kafka/ReliableKafkaReceiver.scala | 2 +- .../streaming/kinesis/KinesisReceiver.scala | 2 +- .../streaming/receiver/ActorReceiver.scala | 8 +- .../streaming/receiver/BlockGenerator.scala | 131 +++--- .../spark/streaming/receiver/RateLimiter.scala | 3 +- .../spark/streaming/receiver/Receiver.scala | 52 ++-- .../streaming/receiver/ReceiverSupervisor.scala | 27 +- .../receiver/ReceiverSupervisorImpl.scala | 33 ++- .../spark/streaming/CheckpointSuite.scala | 16 +- .../apache/spark/streaming/ReceiverSuite.scala | 31 +-- .../receiver/BlockGeneratorSuite.scala | 253 +++ .../scheduler/RateControllerSuite.scala | 64 ++--- .../scheduler/ReceiverTrackerSuite.scala| 129 +- 14 files changed, 534 insertions(+), 219 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a078303/core/src/main/scala/org/apache/spark/util/ManualClock.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala index 1718554..e7a65d7 100644 --- a/core/src/main/scala/org/apache/spark/util/ManualClock.scala +++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala @@ -58,7 +58,7 @@ private[spark] class ManualClock(private var time: Long) extends Clock { */ def waitTillTime(targetTime: Long): Long = synchronized { while (time targetTime) { - wait(100) + wait(10) } getTimeMillis() } http://git-wip-us.apache.org/repos/asf/spark/blob/0a078303/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka