> On Oct. 1, 2013, 7:59 a.m., Sriram Subramanian wrote: > > 1. DefaultChooser should really be default. We should not be allowing > > anything to be configurable within in. The "task.chooser.wrapped.class" is > > hard to understand. This should just be hard coded to RoundRobin. > > > > 2. BatchingChooser does not seem to work with TieredPriorityChooser. Assume > > there were three priorities 3 , 2 , 1 and each priority has a Batch > > chooser. When choose gets called the first time assume there were no data > > in Priority 3 tier. We would then call choose on priority 2 chooser. In the > > next loop, if there were new data in Priority 3 tier, we would start > > choosing that. I think we should be able to use PriorityChooser instead of > > TierPriorityChooser and instead of composing BatchChooser inside > > TierPriorityChooser, I would compose BatchChooser to compose a > > BootstrapChooser which composes a PriorityChooser. Events of the same > > priority are decided using the PriorityQueues compare method.
The behavior you describe in (2) was intentional. It's a philosophical question whether batching should override a higher priority message or not. Thinking on it more, I don't see much down side to the alternative behavior you describe, though, and it'll lead to less thrashing (batch interrupts) in cases where the higher priority stream is low volume. I'll make the change. A second note on eliminating the tiered priority chooser. This is a little bit trickier if we want to do the composition that you describe in the next comment. In such a case, we'll be given something like RoundRobinChooser to break ties for the PriorityChooser (via task.chooser.class). It's unclear to me how to convert a compare(o1, o2) call to a RoundRobinChooser.choose call. The nice thing about the TieredPriorityChooser is that it does this simply by having one chooser for each priority tier. The priority chooser, on the other hand, is more useful if you want to build a chooser that prefers messages based by time (for example). The point I'm trying to convey is that these two choosers actually do subtly different things. Off the top of my head, I can't really see a good way to use a MessageChooser inside of PriorityChooser's Comparator.compare method. I'll have to think about it, but I'm going to keep both if nothing comes to mind. If you have a concrete proposal on how to do this, let me know, and I'll make the changes. - Chris ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/13725/#review26549 ----------------------------------------------------------- On Sept. 26, 2013, 3:03 a.m., Chris Riccomini wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/13725/ > ----------------------------------------------------------- > > (Updated Sept. 26, 2013, 3:03 a.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > wrote default chooser test that bootstraps, prioritizes, and batches. > > > turning on priority tests for default chooser. refactor default chooser a bit > to make it easier to test. > > > test default chooser with just round robin. > > > bug fix to use round robin chooser factory in default chooser > > > change wiring for default chooser to make it more testable. add > start/stop/register tests to all choosers. > > > add one more test in priority chooser. > > > add unit test for tiered priority chooser > > > add unit test for bootstrapping chooser. > > > turn default chooser on in samza container. add license to test stateful > task. add more docs to default chooser. clean up batching unit test. > > > add unit test for batching chooser. > > > only wire in wrapping message choosers when we need them. add docs to the > default chooser factory. > > > rename class to BootstrappingChooser. > > > refactor to move into org.apache.samza.system.chooser. > > > build latest message offset map. > > > add a streams-behind-chooser that guarantees one message from each > SystemStream before choose is called. > > > add start/stop/register back. all tests pass. > > > fix bug -- should allow manual override if Int.MaxInt for bootstrap streams. > > > minor bug in default chooser. was re-using same chooser everywhere. > > > adding wiring in default chooser. > > > initial pass adding composed message choosers. > > > rebase to master, which includes SAMZA-25 metrics. fix several tests that > were broken after removing start/stop/register. > > > adding more docs for round robin > > > remove start/stop/register > > > cleanup some wiki markdown in MessageChooser javadoc. > > > add more javadocs to the message chooser. > > > Merge branch 'SAMZA-2_fine-grain-control-over-stream-consumption' of > github.com:criccomini/incubator-samza into > SAMZA-2_fine-grain-control-over-stream-consumption > > > added start, stop, and register to message chooser. > > > adding docs for message chooser. swiching round robin chooser back to a queue. > > > missed license in message chooser factory > > > add apache licensing > > > samza container was using message chooser, not message chooser factory. fixed. > > > add stream chooser test. update stream chooser to invert priority due to bug. > > > add round robin test. fix compile error in round robin chooser. > > > add priority chooser test. fix bug in priority chooser that was reversing > ordering. > > > adding stream chooser. adding message chooser factory. > > > adding priority chooser. moving default chooser to round robin chooser. > adding config for chooser > > > Diffs > ----- > > docs/learn/documentation/0.7.0/container/streams.md > b15c34d3f68e32d56e0da8af91d78e47a5110f67 > samza-api/src/main/java/org/apache/samza/system/MessageChooser.java > 306b2902303c72f3d7a3eb313f55d7e88d21e00d > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java > c902d414484e05ae75c9ca58ad9629cb01120f62 > samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooser.java > PRE-CREATION > > samza-api/src/main/java/org/apache/samza/system/chooser/PriorityChooser.java > PRE-CREATION > > samza-api/src/main/java/org/apache/samza/util/SinglePartitionSystemAdmin.java > e4ed30bf363142d82a3c40909e160b5825fe60fd > > samza-api/src/test/java/org/apache/samza/system/chooser/TestPriorityChooser.java > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala > 0c742d83c2f60d2448a79376677713a1ff0b11ec > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 62bd243115e71612e00784124baa972b33e56cb7 > samza-core/src/main/scala/org/apache/samza/system/DefaultChooser.scala > 5a72e7a3bfba0f06a5a98c6ba26865800d7780b9 > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala > d24671ec64a42ede6f779effe9c845e1cbbc5e51 > > samza-core/src/main/scala/org/apache/samza/system/chooser/BatchingChooser.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/system/chooser/TieredPriorityChooser.scala > PRE-CREATION > samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/system/chooser/MockMessageChooser.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala > PRE-CREATION > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > 183c6ccce39dedaef9dba56d5b61ffdedfc9d08a > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > 7d4e20a8bdc7a45b0a1b464a6f4b868d1d03eab0 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > 68050528cd5f8acfe3a1f7563b4e7fe6c7473be5 > > Diff: https://reviews.apache.org/r/13725/diff/ > > > Testing > ------- > > > Thanks, > > Chris Riccomini > >
