----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/22215/#review44817 -----------------------------------------------------------
I'm not wild about SSP* for class names. Thus far the convention has been SystemStreamPartition*. I'd rather have it all one way, or all the other. If the gripe is that the SystemStreamPartition name is too long, that's being discussed in another ticket (rename to Stream/StreamPartition), and I believe you had said you had a completely alternative naming idea as well. Can we just stick with the existing class name convention, and discuss the naming issue separately? The taskName: String style feels weird to me. First, does it make sense to use a class instead of a string? That was the point of using Partition initially, instead of just an int. Second, it seems to me that taskId is better than taskName now, after seeing it in code. I think either Sriram or Jay was pushing this name as well. What do you think about a TaskId class instead of a Partition class? I'm not a fan of the way we handle the taskName/partition mapping in the checkpoint manager. Can we just put the mapping in the checkpoint itself, rather than having distinct messages for it? All new classes need docs (e.g. SSPTaskNameGrouper, GroupByPartition, etc). It'd also be nice to have some docs on the website as well. Maybe in the container section. build.gradle <https://reviews.apache.org/r/22215/#comment79359> Can we just move SSPGrouperTestBase to samza-core instead? Szczepan says that this method of pulling in test source is not supported, and hacky, so I'd like to limit it as much as possible. The recommended alternative is apparently to create a samza-test submodule that contains all of the test code. Since we already have a samza-test submodule that's really integration tests, we'd have to either move the samza-test stuff to an integration test submodule, or create a second samza-test module (to avoid circular dependencies: samza-core -> samza-test -> samza-core). To me, easiest fix seems to be to just move the one class to samza-core. check <https://reviews.apache.org/r/22215/#comment79357> Add license header. Add brief docs describing what this is for. check <https://reviews.apache.org/r/22215/#comment79358> Add eclipse. samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java <https://reviews.apache.org/r/22215/#comment79373> This all seems a bit hacky. It'd be better if we could have this passed in through the constructor, and not have mutable methods. samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java <https://reviews.apache.org/r/22215/#comment79360> Is this code-gen'd? Wan't to make sure we're not manually writing equals() since it's error-prone. samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java <https://reviews.apache.org/r/22215/#comment79361> Is this code-gen'd? Wan't to make sure we're not manually writing hashCode() since it's error-prone. samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java <https://reviews.apache.org/r/22215/#comment79362> specified partition -> specified task samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java <https://reviews.apache.org/r/22215/#comment79364> Why do we ned this method if we can get the taskName to partition mapping from checkpoints? Is this just a convenience method to read the mapping across all taskNames? If so, I'd rather keep this out of the interface, and just provide a Util method to handle this. samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java <https://reviews.apache.org/r/22215/#comment79365> Why do we ned this method if we can set the taskName to partition mapping for each checkpoint? Is this just a convenience method to write the mapping once for all taskNames, since it's static? If so, I'd rather keep this out of the interface, and just provide a Util method to handle this. samza-api/src/main/java/org/apache/samza/container/SSPGrouperFactory.java <https://reviews.apache.org/r/22215/#comment79366> Docs are cute at the expense of readability. Can you just be direct here? samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java <https://reviews.apache.org/r/22215/#comment79367> taskNames? Seems redundant to call it taskNameKeys since taskNames are keys by definition, right? samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java <https://reviews.apache.org/r/22215/#comment79368> lost the "set" verb in method name here. samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java <https://reviews.apache.org/r/22215/#comment79369> I'm a little confused here. Isn't the mapping a mapping from taskName to a set of partitions? Why is the type a string here? samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala <https://reviews.apache.org/r/22215/#comment79370> I'm assuming this is still to-be-implemented. I think you mentioned this in the JIRA. samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala <https://reviews.apache.org/r/22215/#comment79372> Can we just import scala.collection._ to make this a little more succinct? samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala <https://reviews.apache.org/r/22215/#comment79377> I'm confused. These configs are named the same thing. Is this intentional? Seems like maybe SSP_TASK_NAME_GROUPER_FACTORY should be deleted. It's unused. samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala <https://reviews.apache.org/r/22215/#comment79401> Just so I'm clear, this is only used for the state store mapping? If a job doesn't use state stores, this env variable doesn't really get used? samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala <https://reviews.apache.org/r/22215/#comment79399> Can we break all of this into a separate method in the SamzaContainer object just to try and keep apply() no worse than it was before. The method is already kind of a mess, so I'm trying to not make it worse. samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala <https://reviews.apache.org/r/22215/#comment79400> This log line is a little dated. There is no longer a partition manager. A more accurate statement would be that the SystemAdmin wasn't able to find any partitions for input streams. samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala <https://reviews.apache.org/r/22215/#comment79404> Do we need the asJavaCollection stuff? We're importing JavaConversions._ already samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala <https://reviews.apache.org/r/22215/#comment79407> Do we need the type here? Seems redundant. samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala <https://reviews.apache.org/r/22215/#comment79409> Any way to make this a little cleaner/more succinct? Totally a nit, but these long scala.* calls just bug me. If you feel strongly about it, I'm ok the way it is, but just voicing my preference. samza-core/src/main/scala/org/apache/samza/container/TaskNameToSSPs.scala <https://reviews.apache.org/r/22215/#comment79413> Oh dear. I feel like the better thing to do here is just to have a custom object that doesn't have to deal with all the Scala junk. samza-core/src/main/scala/org/apache/samza/container/TaskNameToSSPs.scala <https://reviews.apache.org/r/22215/#comment79416> This should be plural (TaskNamesToSSPs) since its a map from multiple taskNames to multiple SSPs. samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupInNSets.scala <https://reviews.apache.org/r/22215/#comment79415> Can we just use the same pattern we've been using here? Either job.num.sets or task.num.sets or something? samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala <https://reviews.apache.org/r/22215/#comment79417> I think you've renamed it because it clashes with the new "taskName" nomenclature. I don't think this is a jobName, though, right? Isn't it more of a containerName? samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala <https://reviews.apache.org/r/22215/#comment79418> I am having trouble following this code. I don't really have any good suggestions, but I think it could maybe use a little more cleanup. Maybe I'm in the minority, though. Anyway... I'm confused. :) samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala <https://reviews.apache.org/r/22215/#comment79420> Clean up. samza-core/src/main/scala/org/apache/samza/util/Util.scala <https://reviews.apache.org/r/22215/#comment79427> I think I'm confused. We can assign multiple taskNames to the same task? I thought we had a 1:1 between TaskInstance -> taskName -> SSPs? samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala <https://reviews.apache.org/r/22215/#comment79429> Skipping review of this class. I think I need a walkthrough to understand what changes are being made here. samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala <https://reviews.apache.org/r/22215/#comment79432> Delete or uncomment. Not sure what you're going for here. samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala <https://reviews.apache.org/r/22215/#comment79437> Still confused. I thought task:taskName mapping was 1:1. Probably forgetting things. Can you explain this to me again? samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala <https://reviews.apache.org/r/22215/#comment79438> Not to excited to delete all of these tests without corresponding tests for taskNames. - Chris Riccomini On June 3, 2014, 7:29 p.m., Jakob Homan wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/22215/ > ----------------------------------------------------------- > > (Updated June 3, 2014, 7:29 p.m.) > > > Review request for samza. > > > Bugs: SAMZA-123 > https://issues.apache.org/jira/browse/SAMZA-123 > > > Repository: samza > > > Description > ------- > > Move topic partition grouping to the AM and generalize > > > Diffs > ----- > > build.gradle 1a1db16 > check PRE-CREATION > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java dcf81bf > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java > 34f50fd > samza-api/src/main/java/org/apache/samza/container/SSPGrouper.java > PRE-CREATION > samza-api/src/main/java/org/apache/samza/container/SSPGrouperFactory.java > PRE-CREATION > > samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java > 5aa7a8f > samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java 5ec6433 > samza-api/src/main/java/org/apache/samza/task/TaskContext.java 611507e > samza-api/src/test/java/org/apache/samza/container/SSPGrouperTestBase.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala > 5735a39 > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala > 9487b58 > > samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala > 364e489 > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala fcafe83 > samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala > 4c2d365 > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4ca340c > > samza-core/src/main/scala/org/apache/samza/container/SSPTaskNameGrouper.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 7ca8af6 > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > 99a9841 > > samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala > 7502124 > samza-core/src/main/scala/org/apache/samza/container/TaskNameToSSPs.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupByPartition.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupBySSP.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupInNSets.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/ssp/taskname/groupers/SimpleSSPTaskNameGrouper.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala > f8865b1 > samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala > e20e7c1 > > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala > 2ed8d7d > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala > 7214151 > samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala > 4ccd604 > samza-core/src/main/scala/org/apache/samza/util/Util.scala 1b548fd > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala > bc54f9e > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala > 552f8c2 > > samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala > 50d9a05 > samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala > fa10231 > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > 190bdfe > samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala > 1f5e3bb > > samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupByPartition.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupBySSP.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupInNSets.scala > PRE-CREATION > samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 21d8a78 > > samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala > 70d8c80 > > samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala > 12f1e03 > samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b8c369b > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala > 62c91e8 > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala > cb6dbdf > > samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala > 92ac61e > > samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala > dae3c2c > > samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java > 222c130 > > samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala > 5b9b926 > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > 10502a9 > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala > c28c9a6 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala > 01a2683 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala > eb1ff54 > > samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala > 17a96f0 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > 0442580 > > Diff: https://reviews.apache.org/r/22215/diff/ > > > Testing > ------- > > Existing and new unit. Now moving on to function. > > > Thanks, > > Jakob Homan > >
